From patchwork Sun Feb 18 22:59:46 2024 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Joshua Watt X-Patchwork-Id: 39660 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from aws-us-west-2-korg-lkml-1.web.codeaurora.org (localhost.localdomain [127.0.0.1]) by smtp.lore.kernel.org (Postfix) with ESMTP id C5220C48BC4 for ; Sun, 18 Feb 2024 23:00:15 +0000 (UTC) Received: from mail-io1-f43.google.com (mail-io1-f43.google.com [209.85.166.43]) by mx.groups.io with SMTP id smtpd.web10.29042.1708297207051272272 for ; Sun, 18 Feb 2024 15:00:07 -0800 Authentication-Results: mx.groups.io; dkim=pass header.i=@gmail.com header.s=20230601 header.b=h99oRWV5; spf=pass (domain: gmail.com, ip: 209.85.166.43, mailfrom: jpewhacker@gmail.com) Received: by mail-io1-f43.google.com with SMTP id ca18e2360f4ac-7bed9fb159fso186861839f.1 for ; Sun, 18 Feb 2024 15:00:06 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20230601; t=1708297205; x=1708902005; darn=lists.openembedded.org; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=JrCuFy6+4gYFmAiuLcexX0nwZGYSdwdswkjWCbd1NAk=; b=h99oRWV5619zIS+ICnVvPJEAF5z6EAInnwPF5aszYGkDg0p6weuPFbvIGw+yh9/Ced 5sh2nIzc5hJHJS+X8B/y8o513LzlItfejvJZX836fOi0SNIdzCpEVQc0EH1jHxdn88IR W78TlXucFLZTRHYhPHnFnNzVA/L/TrCQKhrRaf9lVb8VTGQtmLqpdIbPBTM9n9NWY6Ue 511xF5MtjHwAmyudPltH5SP7yOQH9R6foPTp8eT2fNz9sLCm2ownlkkNA0DDw50ChF59 kO1mm++aoNNKoDval3t/A5Ns+/vfAEBW5yd8/yLJYsd1uchhd14hQtwVZAcPOKORhKL2 t2Bg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1708297205; x=1708902005; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-message-state:from:to:cc :subject:date:message-id:reply-to; bh=JrCuFy6+4gYFmAiuLcexX0nwZGYSdwdswkjWCbd1NAk=; b=siJoUnqUGa+js1BXFj0u4IzPW2FZSRm1i/07cGxhGTYpvWF11BU3cKmTHt3pkM0EuW Al5cLIdUV0UxFyZZPCjzDv71VwzGE5um07B+lTIVML5ruv3PvEEjQaM5nCXPj1VpErzh vIThWjqsVfNzmyGZ9xgk6fheAmgnOygR1bNdIdpGSkI+iGEmkCbWIjWOGOkiqvkiRh1F aFJpOkEdomEV0peVGEyPiO8sSIm/ZlMogLsbEuMxYD9GG1kXiziXclEeSbk6QIz6bs5q Q48NoQw+uG9DkvH9mAao4dzOg7yuyhM+eWZnAbhJZf2RY1fgOkQ6GJqHeIQDjqMqxdjU i8wA== X-Gm-Message-State: AOJu0YxXbHhDdWnS8ti0MjopHG+9E/d7X/BBj96bYLL+slHhfE3b3uA1 fMuVIj/DyZjlD1/XjwzCGkQ/UNDcnBkOf2a2S7M6PsNEcodYzQhWxWUCPnTG X-Google-Smtp-Source: AGHT+IGo4eSc91Tw2KkgiOnt3VxImQ0j76+HcTc4dTO8guYRBtyBMd8OuDbFinotq9F8rv5W6x0bIg== X-Received: by 2002:a6b:c90e:0:b0:7c4:2053:813d with SMTP id z14-20020a6bc90e000000b007c42053813dmr12452587iof.14.1708297204619; Sun, 18 Feb 2024 15:00:04 -0800 (PST) Received: from localhost.localdomain ([2601:282:4300:19e0::44fb]) by smtp.gmail.com with ESMTPSA id n3-20020a02cc03000000b004712a778fb4sm1214848jap.28.2024.02.18.15.00.02 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Sun, 18 Feb 2024 15:00:02 -0800 (PST) From: Joshua Watt X-Google-Original-From: Joshua Watt To: bitbake-devel@lists.openembedded.org Cc: Joshua Watt Subject: [bitbake-devel][PATCH v2 1/8] hashserv: Add Unihash Garbage Collection Date: Sun, 18 Feb 2024 15:59:46 -0700 Message-Id: <20240218225953.2997239-2-JPEWhacker@gmail.com> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20240218225953.2997239-1-JPEWhacker@gmail.com> References: <20240218200743.2982923-1-JPEWhacker@gmail.com> <20240218225953.2997239-1-JPEWhacker@gmail.com> MIME-Version: 1.0 List-Id: X-Webhook-Received: from li982-79.members.linode.com [45.33.32.79] by aws-us-west-2-korg-lkml-1.web.codeaurora.org with HTTPS for ; Sun, 18 Feb 2024 23:00:15 -0000 X-Groupsio-URL: https://lists.openembedded.org/g/bitbake-devel/message/15923 Adds support for removing unused unihashes from the database. This is done using a "mark and sweep" style of garbage collection where a collection is started by marking which unihashes should be kept in the database, then performing a sweep to remove any unmarked hashes. Signed-off-by: Joshua Watt --- bitbake/bin/bitbake-hashclient | 35 +++++ bitbake/lib/hashserv/client.py | 31 ++++ bitbake/lib/hashserv/server.py | 105 ++++++++------ bitbake/lib/hashserv/sqlalchemy.py | 226 ++++++++++++++++++++++++----- bitbake/lib/hashserv/sqlite.py | 205 +++++++++++++++++++++----- bitbake/lib/hashserv/tests.py | 198 +++++++++++++++++++++++++ 6 files changed, 684 insertions(+), 116 deletions(-) diff --git a/bitbake/bin/bitbake-hashclient b/bitbake/bin/bitbake-hashclient index 2cb63386662..f71b87404ae 100755 --- a/bitbake/bin/bitbake-hashclient +++ b/bitbake/bin/bitbake-hashclient @@ -195,6 +195,28 @@ def main(): columns = client.get_db_query_columns() print("\n".join(sorted(columns))) + def handle_gc_status(args, client): + result = client.gc_status() + if not result["mark"]: + print("No Garbage collection in progress") + return 0 + + print("Current Mark: %s" % result["mark"]) + print("Total hashes to keep: %d" % result["keep"]) + print("Total hashes to remove: %s" % result["remove"]) + return 0 + + def handle_gc_mark(args, client): + where = {k: v for k, v in args.where} + result = client.gc_mark(args.mark, where) + print("New hashes marked: %d" % result["count"]) + return 0 + + def handle_gc_sweep(args, client): + result = client.gc_sweep(args.mark) + print("Removed %d rows" % result["count"]) + return 0 + parser = argparse.ArgumentParser(description='Hash Equivalence Client') parser.add_argument('--address', default=DEFAULT_ADDRESS, help='Server address (default "%(default)s")') parser.add_argument('--log', default='WARNING', help='Set logging level') @@ -274,6 +296,19 @@ def main(): db_query_columns_parser = subparsers.add_parser('get-db-query-columns', help="Show columns that can be used in database queries") db_query_columns_parser.set_defaults(func=handle_get_db_query_columns) + gc_status_parser = subparsers.add_parser("gc-status", help="Show garbage collection status") + gc_status_parser.set_defaults(func=handle_gc_status) + + gc_mark_parser = subparsers.add_parser('gc-mark', help="Mark hashes to be kept for garbage collection") + gc_mark_parser.add_argument("mark", help="Mark for this garbage collection operation") + gc_mark_parser.add_argument("--where", "-w", metavar="KEY VALUE", nargs=2, action="append", default=[], + help="Keep entries in table where KEY == VALUE") + gc_mark_parser.set_defaults(func=handle_gc_mark) + + gc_sweep_parser = subparsers.add_parser('gc-sweep', help="Perform garbage collection and delete any entries that are not marked") + gc_sweep_parser.add_argument("mark", help="Mark for this garbage collection operation") + gc_sweep_parser.set_defaults(func=handle_gc_sweep) + args = parser.parse_args() logger = logging.getLogger('hashserv') diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index 35a97687fbe..e6dc4179126 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py @@ -194,6 +194,34 @@ class AsyncClient(bb.asyncrpc.AsyncClient): await self._set_mode(self.MODE_NORMAL) return (await self.invoke({"get-db-query-columns": {}}))["columns"] + async def gc_status(self): + await self._set_mode(self.MODE_NORMAL) + return await self.invoke({"gc-status": {}}) + + async def gc_mark(self, mark, where): + """ + Starts a new garbage collection operation identified by "mark". If + garbage collection is already in progress with "mark", the collection + is continued. + + All unihash entries that match the "where" clause are marked to be + kept. In addition, any new entries added to the database after this + command will be automatically marked with "mark" + """ + await self._set_mode(self.MODE_NORMAL) + return await self.invoke({"gc-mark": {"mark": mark, "where": where}}) + + async def gc_sweep(self, mark): + """ + Finishes garbage collection for "mark". All unihash entries that have + not been marked will be deleted. + + It is recommended to clean unused outhash entries after running this to + cleanup any dangling outhashes + """ + await self._set_mode(self.MODE_NORMAL) + return await self.invoke({"gc-sweep": {"mark": mark}}) + class Client(bb.asyncrpc.Client): def __init__(self, username=None, password=None): @@ -224,6 +252,9 @@ class Client(bb.asyncrpc.Client): "become_user", "get_db_usage", "get_db_query_columns", + "gc_status", + "gc_mark", + "gc_sweep", ) def _get_async_client(self): diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index a86507830e8..5ed852d1f30 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py @@ -199,7 +199,7 @@ def permissions(*permissions, allow_anon=True, allow_self_service=False): if not self.user_has_permissions(*permissions, allow_anon=allow_anon): if not self.user: username = "Anonymous user" - user_perms = self.anon_perms + user_perms = self.server.anon_perms else: username = self.user.username user_perms = self.user.permissions @@ -223,25 +223,11 @@ def permissions(*permissions, allow_anon=True, allow_self_service=False): class ServerClient(bb.asyncrpc.AsyncServerConnection): - def __init__( - self, - socket, - db_engine, - request_stats, - backfill_queue, - upstream, - read_only, - anon_perms, - ): - super().__init__(socket, "OEHASHEQUIV", logger) - self.db_engine = db_engine - self.request_stats = request_stats + def __init__(self, socket, server): + super().__init__(socket, "OEHASHEQUIV", server.logger) + self.server = server self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK - self.backfill_queue = backfill_queue - self.upstream = upstream - self.read_only = read_only self.user = None - self.anon_perms = anon_perms self.handlers.update( { @@ -261,13 +247,16 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): } ) - if not read_only: + if not self.server.read_only: self.handlers.update( { "report-equiv": self.handle_equivreport, "reset-stats": self.handle_reset_stats, "backfill-wait": self.handle_backfill_wait, "remove": self.handle_remove, + "gc-mark": self.handle_gc_mark, + "gc-sweep": self.handle_gc_sweep, + "gc-status": self.handle_gc_status, "clean-unused": self.handle_clean_unused, "refresh-token": self.handle_refresh_token, "set-user-perms": self.handle_set_perms, @@ -282,10 +271,10 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): def user_has_permissions(self, *permissions, allow_anon=True): permissions = set(permissions) if allow_anon: - if ALL_PERM in self.anon_perms: + if ALL_PERM in self.server.anon_perms: return True - if not permissions - self.anon_perms: + if not permissions - self.server.anon_perms: return True if self.user is None: @@ -303,10 +292,10 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): return self.proto_version > (1, 0) and self.proto_version <= (1, 1) async def process_requests(self): - async with self.db_engine.connect(self.logger) as db: + async with self.server.db_engine.connect(self.logger) as db: self.db = db - if self.upstream is not None: - self.upstream_client = await create_async_client(self.upstream) + if self.server.upstream is not None: + self.upstream_client = await create_async_client(self.server.upstream) else: self.upstream_client = None @@ -323,7 +312,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): if "stream" in k: return await self.handlers[k](msg[k]) else: - with self.request_stats.start_sample() as self.request_sample, self.request_sample.measure(): + with self.server.request_stats.start_sample() as self.request_sample, self.request_sample.measure(): return await self.handlers[k](msg[k]) raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) @@ -404,7 +393,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): # possible (which is why the request sample is handled manually # instead of using 'with', and also why logging statements are # commented out. - self.request_sample = self.request_stats.start_sample() + self.request_sample = self.server.request_stats.start_sample() request_measure = self.request_sample.measure() request_measure.start() @@ -435,7 +424,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): # Post to the backfill queue after writing the result to minimize # the turn around time on a request if upstream is not None: - await self.backfill_queue.put((method, taskhash)) + await self.server.backfill_queue.put((method, taskhash)) await self.socket.send("ok") return self.NO_RESPONSE @@ -461,7 +450,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): # report is made inside the function @permissions(READ_PERM) async def handle_report(self, data): - if self.read_only or not self.user_has_permissions(REPORT_PERM): + if self.server.read_only or not self.user_has_permissions(REPORT_PERM): return await self.report_readonly(data) outhash_data = { @@ -538,24 +527,24 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): @permissions(READ_PERM) async def handle_get_stats(self, request): return { - "requests": self.request_stats.todict(), + "requests": self.server.request_stats.todict(), } @permissions(DB_ADMIN_PERM) async def handle_reset_stats(self, request): d = { - "requests": self.request_stats.todict(), + "requests": self.server.request_stats.todict(), } - self.request_stats.reset() + self.server.request_stats.reset() return d @permissions(READ_PERM) async def handle_backfill_wait(self, request): d = { - "tasks": self.backfill_queue.qsize(), + "tasks": self.server.backfill_queue.qsize(), } - await self.backfill_queue.join() + await self.server.backfill_queue.join() return d @permissions(DB_ADMIN_PERM) @@ -566,6 +555,46 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): return {"count": await self.db.remove(condition)} + @permissions(DB_ADMIN_PERM) + async def handle_gc_mark(self, request): + condition = request["where"] + mark = request["mark"] + + if not isinstance(condition, dict): + raise TypeError("Bad condition type %s" % type(condition)) + + if not isinstance(mark, str): + raise TypeError("Bad mark type %s" % type(mark)) + + return {"count": await self.db.gc_mark(mark, condition)} + + @permissions(DB_ADMIN_PERM) + async def handle_gc_sweep(self, request): + mark = request["mark"] + + if not isinstance(mark, str): + raise TypeError("Bad mark type %s" % type(mark)) + + current_mark = await self.db.get_current_gc_mark() + + if not current_mark or mark != current_mark: + raise bb.asyncrpc.InvokeError( + f"'{mark}' is not the current mark. Refusing to sweep" + ) + + count = await self.db.gc_sweep() + + return {"count": count} + + @permissions(DB_ADMIN_PERM) + async def handle_gc_status(self, request): + (keep_rows, remove_rows, current_mark) = await self.db.gc_status() + return { + "keep": keep_rows, + "remove": remove_rows, + "mark": current_mark, + } + @permissions(DB_ADMIN_PERM) async def handle_clean_unused(self, request): max_age = request["max_age_seconds"] @@ -779,15 +808,7 @@ class Server(bb.asyncrpc.AsyncServer): ) def accept_client(self, socket): - return ServerClient( - socket, - self.db_engine, - self.request_stats, - self.backfill_queue, - self.upstream, - self.read_only, - self.anon_perms, - ) + return ServerClient(socket, self) async def create_admin_user(self): admin_permissions = (ALL_PERM,) diff --git a/bitbake/lib/hashserv/sqlalchemy.py b/bitbake/lib/hashserv/sqlalchemy.py index cee04bffb03..89a6b86d9d8 100644 --- a/bitbake/lib/hashserv/sqlalchemy.py +++ b/bitbake/lib/hashserv/sqlalchemy.py @@ -28,6 +28,7 @@ from sqlalchemy import ( delete, update, func, + inspect, ) import sqlalchemy.engine from sqlalchemy.orm import declarative_base @@ -36,16 +37,17 @@ from sqlalchemy.exc import IntegrityError Base = declarative_base() -class UnihashesV2(Base): - __tablename__ = "unihashes_v2" +class UnihashesV3(Base): + __tablename__ = "unihashes_v3" id = Column(Integer, primary_key=True, autoincrement=True) method = Column(Text, nullable=False) taskhash = Column(Text, nullable=False) unihash = Column(Text, nullable=False) + gc_mark = Column(Text, nullable=False) __table_args__ = ( UniqueConstraint("method", "taskhash"), - Index("taskhash_lookup_v3", "method", "taskhash"), + Index("taskhash_lookup_v4", "method", "taskhash"), ) @@ -79,6 +81,36 @@ class Users(Base): __table_args__ = (UniqueConstraint("username"),) +class Config(Base): + __tablename__ = "config" + id = Column(Integer, primary_key=True, autoincrement=True) + name = Column(Text, nullable=False) + value = Column(Text) + __table_args__ = ( + UniqueConstraint("name"), + Index("config_lookup", "name"), + ) + + +# +# Old table versions +# +DeprecatedBase = declarative_base() + + +class UnihashesV2(DeprecatedBase): + __tablename__ = "unihashes_v2" + id = Column(Integer, primary_key=True, autoincrement=True) + method = Column(Text, nullable=False) + taskhash = Column(Text, nullable=False) + unihash = Column(Text, nullable=False) + + __table_args__ = ( + UniqueConstraint("method", "taskhash"), + Index("taskhash_lookup_v3", "method", "taskhash"), + ) + + class DatabaseEngine(object): def __init__(self, url, username=None, password=None): self.logger = logging.getLogger("hashserv.sqlalchemy") @@ -91,6 +123,9 @@ class DatabaseEngine(object): self.url = self.url.set(password=password) async def create(self): + def check_table_exists(conn, name): + return inspect(conn).has_table(name) + self.logger.info("Using database %s", self.url) self.engine = create_async_engine(self.url, poolclass=NullPool) @@ -99,6 +134,24 @@ class DatabaseEngine(object): self.logger.info("Creating tables...") await conn.run_sync(Base.metadata.create_all) + if await conn.run_sync(check_table_exists, UnihashesV2.__tablename__): + self.logger.info("Upgrading Unihashes V2 -> V3...") + statement = insert(UnihashesV3).from_select( + ["id", "method", "unihash", "taskhash", "gc_mark"], + select( + UnihashesV2.id, + UnihashesV2.method, + UnihashesV2.unihash, + UnihashesV2.taskhash, + literal("").label("gc_mark"), + ), + ) + self.logger.debug("%s", statement) + await conn.execute(statement) + + await conn.run_sync(Base.metadata.drop_all, [UnihashesV2.__table__]) + self.logger.info("Upgrade complete") + def connect(self, logger): return Database(self.engine, logger) @@ -118,6 +171,15 @@ def map_user(row): ) +def _make_condition_statement(table, condition): + where = {} + for c in table.__table__.columns: + if c.key in condition and condition[c.key] is not None: + where[c] = condition[c.key] + + return [(k == v) for k, v in where.items()] + + class Database(object): def __init__(self, engine, logger): self.engine = engine @@ -135,17 +197,52 @@ class Database(object): await self.db.close() self.db = None + async def _execute(self, statement): + self.logger.debug("%s", statement) + return await self.db.execute(statement) + + async def _set_config(self, name, value): + while True: + result = await self._execute( + update(Config).where(Config.name == name).values(value=value) + ) + + if result.rowcount == 0: + self.logger.debug("Config '%s' not found. Adding it", name) + try: + await self._execute(insert(Config).values(name=name, value=value)) + except IntegrityError: + # Race. Try again + continue + + break + + def _get_config_subquery(self, name, default=None): + if default is not None: + return func.coalesce( + select(Config.value).where(Config.name == name).scalar_subquery(), + default, + ) + return select(Config.value).where(Config.name == name).scalar_subquery() + + async def _get_config(self, name): + result = await self._execute(select(Config.value).where(Config.name == name)) + row = result.first() + if row is None: + return None + return row.value + async def get_unihash_by_taskhash_full(self, method, taskhash): statement = ( select( OuthashesV2, - UnihashesV2.unihash.label("unihash"), + UnihashesV3.unihash.label("unihash"), ) .join( - UnihashesV2, + UnihashesV3, and_( - UnihashesV2.method == OuthashesV2.method, - UnihashesV2.taskhash == OuthashesV2.taskhash, + UnihashesV3.method == OuthashesV2.method, + UnihashesV3.taskhash == OuthashesV2.taskhash, ), ) .where( @@ -164,12 +261,12 @@ class Database(object): async def get_unihash_by_outhash(self, method, outhash): statement = ( - select(OuthashesV2, UnihashesV2.unihash.label("unihash")) + select(OuthashesV2, UnihashesV3.unihash.label("unihash")) .join( - UnihashesV2, + UnihashesV3, and_( - UnihashesV2.method == OuthashesV2.method, - UnihashesV2.taskhash == OuthashesV2.taskhash, + UnihashesV3.method == OuthashesV2.method, + UnihashesV3.taskhash == OuthashesV2.taskhash, ), ) .where( @@ -208,13 +305,13 @@ class Database(object): statement = ( select( OuthashesV2.taskhash.label("taskhash"), - UnihashesV2.unihash.label("unihash"), + UnihashesV3.unihash.label("unihash"), ) .join( - UnihashesV2, + UnihashesV3, and_( - UnihashesV2.method == OuthashesV2.method, - UnihashesV2.taskhash == OuthashesV2.taskhash, + UnihashesV3.method == OuthashesV2.method, + UnihashesV3.taskhash == OuthashesV2.taskhash, ), ) .where( @@ -234,12 +331,12 @@ class Database(object): async def get_equivalent(self, method, taskhash): statement = select( - UnihashesV2.unihash, - UnihashesV2.method, - UnihashesV2.taskhash, + UnihashesV3.unihash, + UnihashesV3.method, + UnihashesV3.taskhash, ).where( - UnihashesV2.method == method, - UnihashesV2.taskhash == taskhash, + UnihashesV3.method == method, + UnihashesV3.taskhash == taskhash, ) self.logger.debug("%s", statement) async with self.db.begin(): @@ -248,13 +345,9 @@ class Database(object): async def remove(self, condition): async def do_remove(table): - where = {} - for c in table.__table__.columns: - if c.key in condition and condition[c.key] is not None: - where[c] = condition[c.key] - + where = _make_condition_statement(table, condition) if where: - statement = delete(table).where(*[(k == v) for k, v in where.items()]) + statement = delete(table).where(*where) self.logger.debug("%s", statement) async with self.db.begin(): result = await self.db.execute(statement) @@ -263,19 +356,74 @@ class Database(object): return 0 count = 0 - count += await do_remove(UnihashesV2) + count += await do_remove(UnihashesV3) count += await do_remove(OuthashesV2) return count + async def get_current_gc_mark(self): + async with self.db.begin(): + return await self._get_config("gc-mark") + + async def gc_status(self): + async with self.db.begin(): + gc_mark_subquery = self._get_config_subquery("gc-mark", "") + + result = await self._execute( + select(func.count()) + .select_from(UnihashesV3) + .where(UnihashesV3.gc_mark == gc_mark_subquery) + ) + keep_rows = result.scalar() + + result = await self._execute( + select(func.count()) + .select_from(UnihashesV3) + .where(UnihashesV3.gc_mark != gc_mark_subquery) + ) + remove_rows = result.scalar() + + return (keep_rows, remove_rows, await self._get_config("gc-mark")) + + async def gc_mark(self, mark, condition): + async with self.db.begin(): + await self._set_config("gc-mark", mark) + + where = _make_condition_statement(UnihashesV3, condition) + if not where: + return 0 + + result = await self._execute( + update(UnihashesV3) + .values(gc_mark=self._get_config_subquery("gc-mark", "")) + .where(*where) + ) + return result.rowcount + + async def gc_sweep(self): + async with self.db.begin(): + result = await self._execute( + delete(UnihashesV3).where( + # A sneaky conditional that provides some errant use + # protection: If the config mark is NULL, this will not + # match any rows because No default is specified in the + # select statement + UnihashesV3.gc_mark + != self._get_config_subquery("gc-mark") + ) + ) + await self._set_config("gc-mark", None) + + return result.rowcount + async def clean_unused(self, oldest): statement = delete(OuthashesV2).where( OuthashesV2.created < oldest, ~( - select(UnihashesV2.id) + select(UnihashesV3.id) .where( - UnihashesV2.method == OuthashesV2.method, - UnihashesV2.taskhash == OuthashesV2.taskhash, + UnihashesV3.method == OuthashesV2.method, + UnihashesV3.taskhash == OuthashesV2.taskhash, ) .limit(1) .exists() @@ -287,15 +435,17 @@ class Database(object): return result.rowcount async def insert_unihash(self, method, taskhash, unihash): - statement = insert(UnihashesV2).values( - method=method, - taskhash=taskhash, - unihash=unihash, - ) - self.logger.debug("%s", statement) try: async with self.db.begin(): - await self.db.execute(statement) + await self._execute( + insert(UnihashesV3).values( + method=method, + taskhash=taskhash, + unihash=unihash, + gc_mark=self._get_config_subquery("gc-mark", ""), + ) + ) + return True except IntegrityError: self.logger.debug( @@ -418,7 +568,7 @@ class Database(object): async def get_query_columns(self): columns = set() - for table in (UnihashesV2, OuthashesV2): + for table in (UnihashesV3, OuthashesV2): for c in table.__table__.columns: if not isinstance(c.type, Text): continue diff --git a/bitbake/lib/hashserv/sqlite.py b/bitbake/lib/hashserv/sqlite.py index f93cb2c1dd9..608490730d7 100644 --- a/bitbake/lib/hashserv/sqlite.py +++ b/bitbake/lib/hashserv/sqlite.py @@ -15,6 +15,7 @@ UNIHASH_TABLE_DEFINITION = ( ("method", "TEXT NOT NULL", "UNIQUE"), ("taskhash", "TEXT NOT NULL", "UNIQUE"), ("unihash", "TEXT NOT NULL", ""), + ("gc_mark", "TEXT NOT NULL", ""), ) UNIHASH_TABLE_COLUMNS = tuple(name for name, _, _ in UNIHASH_TABLE_DEFINITION) @@ -44,6 +45,14 @@ USERS_TABLE_DEFINITION = ( USERS_TABLE_COLUMNS = tuple(name for name, _, _ in USERS_TABLE_DEFINITION) +CONFIG_TABLE_DEFINITION = ( + ("name", "TEXT NOT NULL", "UNIQUE"), + ("value", "TEXT", ""), +) + +CONFIG_TABLE_COLUMNS = tuple(name for name, _, _ in CONFIG_TABLE_DEFINITION) + + def _make_table(cursor, name, definition): cursor.execute( """ @@ -71,6 +80,35 @@ def map_user(row): ) +def _make_condition_statement(columns, condition): + where = {} + for c in columns: + if c in condition and condition[c] is not None: + where[c] = condition[c] + + return where, " AND ".join("%s=:%s" % (k, k) for k in where.keys()) + + +def _get_sqlite_version(cursor): + cursor.execute("SELECT sqlite_version()") + + version = [] + for v in cursor.fetchone()[0].split("."): + try: + version.append(int(v)) + except ValueError: + version.append(v) + + return tuple(version) + + +def _schema_table_name(version): + if version >= (3, 33): + return "sqlite_schema" + + return "sqlite_master" + + class DatabaseEngine(object): def __init__(self, dbname, sync): self.dbname = dbname @@ -82,9 +120,10 @@ class DatabaseEngine(object): db.row_factory = sqlite3.Row with closing(db.cursor()) as cursor: - _make_table(cursor, "unihashes_v2", UNIHASH_TABLE_DEFINITION) + _make_table(cursor, "unihashes_v3", UNIHASH_TABLE_DEFINITION) _make_table(cursor, "outhashes_v2", OUTHASH_TABLE_DEFINITION) _make_table(cursor, "users", USERS_TABLE_DEFINITION) + _make_table(cursor, "config", CONFIG_TABLE_DEFINITION) cursor.execute("PRAGMA journal_mode = WAL") cursor.execute( @@ -96,17 +135,38 @@ class DatabaseEngine(object): cursor.execute("DROP INDEX IF EXISTS outhash_lookup") cursor.execute("DROP INDEX IF EXISTS taskhash_lookup_v2") cursor.execute("DROP INDEX IF EXISTS outhash_lookup_v2") + cursor.execute("DROP INDEX IF EXISTS taskhash_lookup_v3") # TODO: Upgrade from tasks_v2? cursor.execute("DROP TABLE IF EXISTS tasks_v2") # Create new indexes cursor.execute( - "CREATE INDEX IF NOT EXISTS taskhash_lookup_v3 ON unihashes_v2 (method, taskhash)" + "CREATE INDEX IF NOT EXISTS taskhash_lookup_v4 ON unihashes_v3 (method, taskhash)" ) cursor.execute( "CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)" ) + cursor.execute("CREATE INDEX IF NOT EXISTS config_lookup ON config (name)") + + sqlite_version = _get_sqlite_version(cursor) + + cursor.execute( + f""" + SELECT name FROM {_schema_table_name(sqlite_version)} WHERE type = 'table' AND name = 'unihashes_v2' + """ + ) + if cursor.fetchone(): + self.logger.info("Upgrading Unihashes V2 -> V3...") + cursor.execute( + """ + INSERT INTO unihashes_v3 (id, method, unihash, taskhash, gc_mark) + SELECT id, method, unihash, taskhash, '' FROM unihashes_v2 + """ + ) + cursor.execute("DROP TABLE unihashes_v2") + db.commit() + self.logger.info("Upgrade complete") def connect(self, logger): return Database(logger, self.dbname, self.sync) @@ -126,16 +186,7 @@ class Database(object): "PRAGMA synchronous = %s" % ("NORMAL" if sync else "OFF") ) - cursor.execute("SELECT sqlite_version()") - - version = [] - for v in cursor.fetchone()[0].split("."): - try: - version.append(int(v)) - except ValueError: - version.append(v) - - self.sqlite_version = tuple(version) + self.sqlite_version = _get_sqlite_version(cursor) async def __aenter__(self): return self @@ -143,6 +194,30 @@ class Database(object): async def __aexit__(self, exc_type, exc_value, traceback): await self.close() + async def _set_config(self, cursor, name, value): + cursor.execute( + """ + INSERT OR REPLACE INTO config (id, name, value) VALUES + ((SELECT id FROM config WHERE name=:name), :name, :value) + """, + { + "name": name, + "value": value, + }, + ) + + async def _get_config(self, cursor, name): + cursor.execute( + "SELECT value FROM config WHERE name=:name", + { + "name": name, + }, + ) + row = cursor.fetchone() + if row is None: + return None + return row["value"] + async def close(self): self.db.close() @@ -150,8 +225,8 @@ class Database(object): with closing(self.db.cursor()) as cursor: cursor.execute( """ - SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 - INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash + SELECT *, unihashes_v3.unihash AS unihash FROM outhashes_v2 + INNER JOIN unihashes_v3 ON unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash WHERE outhashes_v2.method=:method AND outhashes_v2.taskhash=:taskhash ORDER BY outhashes_v2.created ASC LIMIT 1 @@ -167,8 +242,8 @@ class Database(object): with closing(self.db.cursor()) as cursor: cursor.execute( """ - SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 - INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash + SELECT *, unihashes_v3.unihash AS unihash FROM outhashes_v2 + INNER JOIN unihashes_v3 ON unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash ORDER BY outhashes_v2.created ASC LIMIT 1 @@ -200,8 +275,8 @@ class Database(object): with closing(self.db.cursor()) as cursor: cursor.execute( """ - SELECT outhashes_v2.taskhash AS taskhash, unihashes_v2.unihash AS unihash FROM outhashes_v2 - INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash + SELECT outhashes_v2.taskhash AS taskhash, unihashes_v3.unihash AS unihash FROM outhashes_v2 + INNER JOIN unihashes_v3 ON unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash -- Select any matching output hash except the one we just inserted WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash AND outhashes_v2.taskhash!=:taskhash -- Pick the oldest hash @@ -219,7 +294,7 @@ class Database(object): async def get_equivalent(self, method, taskhash): with closing(self.db.cursor()) as cursor: cursor.execute( - "SELECT taskhash, method, unihash FROM unihashes_v2 WHERE method=:method AND taskhash=:taskhash", + "SELECT taskhash, method, unihash FROM unihashes_v3 WHERE method=:method AND taskhash=:taskhash", { "method": method, "taskhash": taskhash, @@ -229,15 +304,9 @@ class Database(object): async def remove(self, condition): def do_remove(columns, table_name, cursor): - where = {} - for c in columns: - if c in condition and condition[c] is not None: - where[c] = condition[c] - + where, clause = _make_condition_statement(columns, condition) if where: - query = ("DELETE FROM %s WHERE " % table_name) + " AND ".join( - "%s=:%s" % (k, k) for k in where.keys() - ) + query = f"DELETE FROM {table_name} WHERE {clause}" cursor.execute(query, where) return cursor.rowcount @@ -246,17 +315,80 @@ class Database(object): count = 0 with closing(self.db.cursor()) as cursor: count += do_remove(OUTHASH_TABLE_COLUMNS, "outhashes_v2", cursor) - count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor) + count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v3", cursor) self.db.commit() return count + async def get_current_gc_mark(self): + with closing(self.db.cursor()) as cursor: + return await self._get_config(cursor, "gc-mark") + + async def gc_status(self): + with closing(self.db.cursor()) as cursor: + cursor.execute( + """ + SELECT COUNT() FROM unihashes_v3 WHERE + gc_mark=COALESCE((SELECT value FROM config WHERE name='gc-mark'), '') + """ + ) + keep_rows = cursor.fetchone()[0] + + cursor.execute( + """ + SELECT COUNT() FROM unihashes_v3 WHERE + gc_mark!=COALESCE((SELECT value FROM config WHERE name='gc-mark'), '') + """ + ) + remove_rows = cursor.fetchone()[0] + + current_mark = await self._get_config(cursor, "gc-mark") + + return (keep_rows, remove_rows, current_mark) + + async def gc_mark(self, mark, condition): + with closing(self.db.cursor()) as cursor: + await self._set_config(cursor, "gc-mark", mark) + + where, clause = _make_condition_statement(UNIHASH_TABLE_COLUMNS, condition) + + new_rows = 0 + if where: + cursor.execute( + f""" + UPDATE unihashes_v3 SET + gc_mark=COALESCE((SELECT value FROM config WHERE name='gc-mark'), '') + WHERE {clause} + """, + where, + ) + new_rows = cursor.rowcount + + self.db.commit() + return new_rows + + async def gc_sweep(self): + with closing(self.db.cursor()) as cursor: + # NOTE: COALESCE is not used in this query so that if the current + # mark is NULL, nothing will happen + cursor.execute( + """ + DELETE FROM unihashes_v3 WHERE + gc_mark!=(SELECT value FROM config WHERE name='gc-mark') + """ + ) + count = cursor.rowcount + await self._set_config(cursor, "gc-mark", None) + + self.db.commit() + return count + async def clean_unused(self, oldest): with closing(self.db.cursor()) as cursor: cursor.execute( """ DELETE FROM outhashes_v2 WHERE created<:oldest AND NOT EXISTS ( - SELECT unihashes_v2.id FROM unihashes_v2 WHERE unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash LIMIT 1 + SELECT unihashes_v3.id FROM unihashes_v3 WHERE unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash LIMIT 1 ) """, { @@ -271,7 +403,13 @@ class Database(object): prevrowid = cursor.lastrowid cursor.execute( """ - INSERT OR IGNORE INTO unihashes_v2 (method, taskhash, unihash) VALUES(:method, :taskhash, :unihash) + INSERT OR IGNORE INTO unihashes_v3 (method, taskhash, unihash, gc_mark) VALUES + ( + :method, + :taskhash, + :unihash, + COALESCE((SELECT value FROM config WHERE name='gc-mark'), '') + ) """, { "method": method, @@ -383,14 +521,9 @@ class Database(object): async def get_usage(self): usage = {} with closing(self.db.cursor()) as cursor: - if self.sqlite_version >= (3, 33): - table_name = "sqlite_schema" - else: - table_name = "sqlite_master" - cursor.execute( f""" - SELECT name FROM {table_name} WHERE type = 'table' AND name NOT LIKE 'sqlite_%' + SELECT name FROM {_schema_table_name(self.sqlite_version)} WHERE type = 'table' AND name NOT LIKE 'sqlite_%' """ ) for row in cursor.fetchall(): diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index 869f7636c53..aeedab3575e 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py @@ -810,6 +810,27 @@ class HashEquivalenceCommonTests(object): with self.auth_perms("@user-admin") as client: become = client.become_user(client.username) + def test_auth_gc(self): + admin_client = self.start_auth_server() + + with self.auth_perms() as client, self.assertRaises(InvokeError): + client.gc_mark("ABC", {"unihash": "123"}) + + with self.auth_perms() as client, self.assertRaises(InvokeError): + client.gc_status() + + with self.auth_perms() as client, self.assertRaises(InvokeError): + client.gc_sweep("ABC") + + with self.auth_perms("@db-admin") as client: + client.gc_mark("ABC", {"unihash": "123"}) + + with self.auth_perms("@db-admin") as client: + client.gc_status() + + with self.auth_perms("@db-admin") as client: + client.gc_sweep("ABC") + def test_get_db_usage(self): usage = self.client.get_db_usage() @@ -837,6 +858,147 @@ class HashEquivalenceCommonTests(object): data = client.get_taskhash(self.METHOD, taskhash, True) self.assertEqual(data["owner"], user["username"]) + def test_gc(self): + taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' + outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' + unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646' + + result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) + self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') + + taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4' + outhash2 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4' + unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b' + + result = self.client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2) + self.assertClientGetHash(self.client, taskhash2, unihash2) + + # Mark the first unihash to be kept + ret = self.client.gc_mark("ABC", {"unihash": unihash, "method": self.METHOD}) + self.assertEqual(ret, {"count": 1}) + + ret = self.client.gc_status() + self.assertEqual(ret, {"mark": "ABC", "keep": 1, "remove": 1}) + + # Second hash is still there; mark doesn't delete hashes + self.assertClientGetHash(self.client, taskhash2, unihash2) + + ret = self.client.gc_sweep("ABC") + self.assertEqual(ret, {"count": 1}) + + # Hash is gone. Taskhash is returned for second hash + self.assertClientGetHash(self.client, taskhash2, None) + # First hash is still present + self.assertClientGetHash(self.client, taskhash, unihash) + + def test_gc_switch_mark(self): + taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' + outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' + unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646' + + result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) + self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') + + taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4' + outhash2 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4' + unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b' + + result = self.client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2) + self.assertClientGetHash(self.client, taskhash2, unihash2) + + # Mark the first unihash to be kept + ret = self.client.gc_mark("ABC", {"unihash": unihash, "method": self.METHOD}) + self.assertEqual(ret, {"count": 1}) + + ret = self.client.gc_status() + self.assertEqual(ret, {"mark": "ABC", "keep": 1, "remove": 1}) + + # Second hash is still there; mark doesn't delete hashes + self.assertClientGetHash(self.client, taskhash2, unihash2) + + # Switch to a different mark and mark the second hash. This will start + # a new collection cycle + ret = self.client.gc_mark("DEF", {"unihash": unihash2, "method": self.METHOD}) + self.assertEqual(ret, {"count": 1}) + + ret = self.client.gc_status() + self.assertEqual(ret, {"mark": "DEF", "keep": 1, "remove": 1}) + + # Both hashes are still present + self.assertClientGetHash(self.client, taskhash2, unihash2) + self.assertClientGetHash(self.client, taskhash, unihash) + + # Sweep with the new mark + ret = self.client.gc_sweep("DEF") + self.assertEqual(ret, {"count": 1}) + + # First hash is gone, second is kept + self.assertClientGetHash(self.client, taskhash2, unihash2) + self.assertClientGetHash(self.client, taskhash, None) + + def test_gc_switch_sweep_mark(self): + taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' + outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' + unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646' + + result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) + self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') + + taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4' + outhash2 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4' + unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b' + + result = self.client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2) + self.assertClientGetHash(self.client, taskhash2, unihash2) + + # Mark the first unihash to be kept + ret = self.client.gc_mark("ABC", {"unihash": unihash, "method": self.METHOD}) + self.assertEqual(ret, {"count": 1}) + + ret = self.client.gc_status() + self.assertEqual(ret, {"mark": "ABC", "keep": 1, "remove": 1}) + + # Sweeping with a different mark raises an error + with self.assertRaises(InvokeError): + self.client.gc_sweep("DEF") + + # Both hashes are present + self.assertClientGetHash(self.client, taskhash2, unihash2) + self.assertClientGetHash(self.client, taskhash, unihash) + + def test_gc_new_hashes(self): + taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' + outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' + unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646' + + result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) + self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') + + # Start a new garbage collection + ret = self.client.gc_mark("ABC", {"unihash": unihash, "method": self.METHOD}) + self.assertEqual(ret, {"count": 1}) + + ret = self.client.gc_status() + self.assertEqual(ret, {"mark": "ABC", "keep": 1, "remove": 0}) + + # Add second hash. It should inherit the mark from the current garbage + # collection operation + + taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4' + outhash2 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4' + unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b' + + result = self.client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2) + self.assertClientGetHash(self.client, taskhash2, unihash2) + + # Sweep should remove nothing + ret = self.client.gc_sweep("ABC") + self.assertEqual(ret, {"count": 0}) + + # Both hashes are present + self.assertClientGetHash(self.client, taskhash2, unihash2) + self.assertClientGetHash(self.client, taskhash, unihash) + class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase): def get_server_addr(self, server_idx): @@ -1086,6 +1248,42 @@ class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase): "get-db-query-columns", ], check=True) + def test_gc(self): + taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' + outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' + unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646' + + result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) + self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') + + taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4' + outhash2 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4' + unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b' + + result = self.client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2) + self.assertClientGetHash(self.client, taskhash2, unihash2) + + # Mark the first unihash to be kept + self.run_hashclient([ + "--address", self.server_address, + "gc-mark", "ABC", + "--where", "unihash", unihash, + "--where", "method", self.METHOD + ], check=True) + + # Second hash is still there; mark doesn't delete hashes + self.assertClientGetHash(self.client, taskhash2, unihash2) + + self.run_hashclient([ + "--address", self.server_address, + "gc-sweep", "ABC", + ], check=True) + + # Hash is gone. Taskhash is returned for second hash + self.assertClientGetHash(self.client, taskhash2, None) + # First hash is still present + self.assertClientGetHash(self.client, taskhash, unihash) + class TestHashEquivalenceUnixServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase): def get_server_addr(self, server_idx): From patchwork Sun Feb 18 22:59:47 2024 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Joshua Watt X-Patchwork-Id: 39664 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from aws-us-west-2-korg-lkml-1.web.codeaurora.org (localhost.localdomain [127.0.0.1]) by smtp.lore.kernel.org (Postfix) with ESMTP id C980AC54766 for ; Sun, 18 Feb 2024 23:00:15 +0000 (UTC) Received: from mail-io1-f50.google.com (mail-io1-f50.google.com [209.85.166.50]) by mx.groups.io with SMTP id smtpd.web10.29044.1708297208134139715 for ; Sun, 18 Feb 2024 15:00:08 -0800 Authentication-Results: mx.groups.io; dkim=pass header.i=@gmail.com header.s=20230601 header.b=KJWZH7tT; spf=pass (domain: gmail.com, ip: 209.85.166.50, mailfrom: jpewhacker@gmail.com) Received: by mail-io1-f50.google.com with SMTP id ca18e2360f4ac-7c72294e3d1so131429439f.1 for ; Sun, 18 Feb 2024 15:00:08 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20230601; t=1708297206; x=1708902006; darn=lists.openembedded.org; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=GaTwCVBHiMp2WMJhdaju/OW/ME4sPP5hS8BJw1s907w=; b=KJWZH7tTHfMqYiEcqM0FehHl/u8MCsMLjRdvFHw/GBWf/8k1bOAkMDHnqwEPshZjyh cl7IxTLvfC775H6DlNQfOTVTEBlidAEH7sfusY6P6YYSwSRkn1/wzKN22BuhMbs6Pt+f vgFaRvIG+Fpg1KJHUbStnOG945BN1PkcVn4XOzxNOzzASI32tSQu0nbk1Z7UcoyCNdJc E2RVJ4oX6O4wAMUAGPSb/E0RDJbSWgbiFEt2iIS0nNj0q/fU8nqfc8p2AKRt7qI2B0LZ uW5+NwhOjoIo6m6oVcr8sFqML5X/ap6rKp/Pnp84VdBkV0450vDEcl0tmx44Kiiz8G5C gFdQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1708297206; x=1708902006; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-message-state:from:to:cc :subject:date:message-id:reply-to; bh=GaTwCVBHiMp2WMJhdaju/OW/ME4sPP5hS8BJw1s907w=; b=fxbxr/looFjk1BM2mjXVpcZZdM/6HbUYx2ESJ11WfgoLCryRSNrI4BzkP4I4lKUbrH rW3/6iMCIQRDHiXr5P8jJGsnH1gdUHOTCPcosF//2iEHTYiVX7dSC5oNlcp74L67VjcX qEVWxC0DsjQ5GtcD///8CEtKTQecsWZgM15KL4vYPwkcSCQ6j8n5wOyz5H0Uyg269xto eSTgoLze7DNtiqXIcEZIlGTkq0Se2Z2/jFvXJ5/Yj5y8tq9XJeSoM9S9pMDgvix+/51t 00jUFGdWNU0N9Evy9FJ6SOWSGBQ03VNFZ2/OOdbDpbvBj2XpliQTAi0/BdBqJRyqeblo +fXQ== X-Gm-Message-State: AOJu0YxKQ0V3drgItUs1DpD3KvgI/prpjr/gVbG9p30jn0a2PNLkEkMa MVIgeWRo+Fk/0e/nsWLF9SnsKe0FRUApLRrNzgrRHACqL0GzcE9LHc82P5BZ X-Google-Smtp-Source: AGHT+IH4FbQ9KT7sy4tZfztv4Q/fY7dHpCocs9tr61uThv/zfe5JCwXfZkB8OHLDa4zQHKiATtespw== X-Received: by 2002:a6b:6d16:0:b0:7c7:22c3:12a0 with SMTP id a22-20020a6b6d16000000b007c722c312a0mr11258iod.13.1708297205734; Sun, 18 Feb 2024 15:00:05 -0800 (PST) Received: from localhost.localdomain ([2601:282:4300:19e0::44fb]) by smtp.gmail.com with ESMTPSA id n3-20020a02cc03000000b004712a778fb4sm1214848jap.28.2024.02.18.15.00.04 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Sun, 18 Feb 2024 15:00:05 -0800 (PST) From: Joshua Watt X-Google-Original-From: Joshua Watt To: bitbake-devel@lists.openembedded.org Cc: Joshua Watt Subject: [bitbake-devel][PATCH v2 2/8] hashserv: sqlalchemy: Use _execute() helper Date: Sun, 18 Feb 2024 15:59:47 -0700 Message-Id: <20240218225953.2997239-3-JPEWhacker@gmail.com> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20240218225953.2997239-1-JPEWhacker@gmail.com> References: <20240218200743.2982923-1-JPEWhacker@gmail.com> <20240218225953.2997239-1-JPEWhacker@gmail.com> MIME-Version: 1.0 List-Id: X-Webhook-Received: from li982-79.members.linode.com [45.33.32.79] by aws-us-west-2-korg-lkml-1.web.codeaurora.org with HTTPS for ; Sun, 18 Feb 2024 23:00:15 -0000 X-Groupsio-URL: https://lists.openembedded.org/g/bitbake-devel/message/15924 Use the _execute() helper to execute queries. This helper does the logging of the statement that was being done manually everywhere. Signed-off-by: Joshua Watt --- bitbake/lib/hashserv/sqlalchemy.py | 297 ++++++++++++++--------------- 1 file changed, 140 insertions(+), 157 deletions(-) diff --git a/bitbake/lib/hashserv/sqlalchemy.py b/bitbake/lib/hashserv/sqlalchemy.py index 89a6b86d9d8..873547809a0 100644 --- a/bitbake/lib/hashserv/sqlalchemy.py +++ b/bitbake/lib/hashserv/sqlalchemy.py @@ -233,124 +233,113 @@ class Database(object): return row.value async def get_unihash_by_taskhash_full(self, method, taskhash): - statement = ( - select( - OuthashesV2, - UnihashesV3.unihash.label("unihash"), - ) - .join( - UnihashesV3, - and_( - UnihashesV3.method == OuthashesV2.method, - UnihashesV3.taskhash == OuthashesV2.taskhash, - ), - ) - .where( - OuthashesV2.method == method, - OuthashesV2.taskhash == taskhash, - ) - .order_by( - OuthashesV2.created.asc(), - ) - .limit(1) - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + select( + OuthashesV2, + UnihashesV3.unihash.label("unihash"), + ) + .join( + UnihashesV3, + and_( + UnihashesV3.method == OuthashesV2.method, + UnihashesV3.taskhash == OuthashesV2.taskhash, + ), + ) + .where( + OuthashesV2.method == method, + OuthashesV2.taskhash == taskhash, + ) + .order_by( + OuthashesV2.created.asc(), + ) + .limit(1) + ) return map_row(result.first()) async def get_unihash_by_outhash(self, method, outhash): - statement = ( - select(OuthashesV2, UnihashesV3.unihash.label("unihash")) - .join( - UnihashesV3, - and_( - UnihashesV3.method == OuthashesV2.method, - UnihashesV3.taskhash == OuthashesV2.taskhash, - ), - ) - .where( - OuthashesV2.method == method, - OuthashesV2.outhash == outhash, - ) - .order_by( - OuthashesV2.created.asc(), - ) - .limit(1) - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + select(OuthashesV2, UnihashesV3.unihash.label("unihash")) + .join( + UnihashesV3, + and_( + UnihashesV3.method == OuthashesV2.method, + UnihashesV3.taskhash == OuthashesV2.taskhash, + ), + ) + .where( + OuthashesV2.method == method, + OuthashesV2.outhash == outhash, + ) + .order_by( + OuthashesV2.created.asc(), + ) + .limit(1) + ) return map_row(result.first()) async def get_outhash(self, method, outhash): - statement = ( - select(OuthashesV2) - .where( - OuthashesV2.method == method, - OuthashesV2.outhash == outhash, - ) - .order_by( - OuthashesV2.created.asc(), - ) - .limit(1) - ) - - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + select(OuthashesV2) + .where( + OuthashesV2.method == method, + OuthashesV2.outhash == outhash, + ) + .order_by( + OuthashesV2.created.asc(), + ) + .limit(1) + ) return map_row(result.first()) async def get_equivalent_for_outhash(self, method, outhash, taskhash): - statement = ( - select( - OuthashesV2.taskhash.label("taskhash"), - UnihashesV3.unihash.label("unihash"), - ) - .join( - UnihashesV3, - and_( - UnihashesV3.method == OuthashesV2.method, - UnihashesV3.taskhash == OuthashesV2.taskhash, - ), - ) - .where( - OuthashesV2.method == method, - OuthashesV2.outhash == outhash, - OuthashesV2.taskhash != taskhash, - ) - .order_by( - OuthashesV2.created.asc(), - ) - .limit(1) - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + select( + OuthashesV2.taskhash.label("taskhash"), + UnihashesV3.unihash.label("unihash"), + ) + .join( + UnihashesV3, + and_( + UnihashesV3.method == OuthashesV2.method, + UnihashesV3.taskhash == OuthashesV2.taskhash, + ), + ) + .where( + OuthashesV2.method == method, + OuthashesV2.outhash == outhash, + OuthashesV2.taskhash != taskhash, + ) + .order_by( + OuthashesV2.created.asc(), + ) + .limit(1) + ) return map_row(result.first()) async def get_equivalent(self, method, taskhash): - statement = select( - UnihashesV3.unihash, - UnihashesV3.method, - UnihashesV3.taskhash, - ).where( - UnihashesV3.method == method, - UnihashesV3.taskhash == taskhash, - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + select( + UnihashesV3.unihash, + UnihashesV3.method, + UnihashesV3.taskhash, + ).where( + UnihashesV3.method == method, + UnihashesV3.taskhash == taskhash, + ) + ) return map_row(result.first()) async def remove(self, condition): async def do_remove(table): where = _make_condition_statement(table, condition) if where: - statement = delete(table).where(*where) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute(delete(table).where(*where)) return result.rowcount return 0 @@ -417,21 +406,21 @@ class Database(object): return result.rowcount async def clean_unused(self, oldest): - statement = delete(OuthashesV2).where( - OuthashesV2.created < oldest, - ~( - select(UnihashesV3.id) - .where( - UnihashesV3.method == OuthashesV2.method, - UnihashesV3.taskhash == OuthashesV2.taskhash, - ) - .limit(1) - .exists() - ), - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + delete(OuthashesV2).where( + OuthashesV2.created < oldest, + ~( + select(UnihashesV3.id) + .where( + UnihashesV3.method == OuthashesV2.method, + UnihashesV3.taskhash == OuthashesV2.taskhash, + ) + .limit(1) + .exists() + ), + ) + ) return result.rowcount async def insert_unihash(self, method, taskhash, unihash): @@ -461,11 +450,9 @@ class Database(object): if "created" in data and not isinstance(data["created"], datetime): data["created"] = datetime.fromisoformat(data["created"]) - statement = insert(OuthashesV2).values(**data) - self.logger.debug("%s", statement) try: async with self.db.begin(): - await self.db.execute(statement) + await self._execute(insert(OuthashesV2).values(**data)) return True except IntegrityError: self.logger.debug( @@ -474,16 +461,16 @@ class Database(object): return False async def _get_user(self, username): - statement = select( - Users.username, - Users.permissions, - Users.token, - ).where( - Users.username == username, - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + select( + Users.username, + Users.permissions, + Users.token, + ).where( + Users.username == username, + ) + ) return result.first() async def lookup_user_token(self, username): @@ -496,70 +483,66 @@ class Database(object): return map_user(await self._get_user(username)) async def set_user_token(self, username, token): - statement = ( - update(Users) - .where( - Users.username == username, - ) - .values( - token=token, - ) - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + update(Users) + .where( + Users.username == username, + ) + .values( + token=token, + ) + ) return result.rowcount != 0 async def set_user_perms(self, username, permissions): - statement = ( - update(Users) - .where(Users.username == username) - .values(permissions=" ".join(permissions)) - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + update(Users) + .where(Users.username == username) + .values(permissions=" ".join(permissions)) + ) return result.rowcount != 0 async def get_all_users(self): - statement = select( - Users.username, - Users.permissions, - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + select( + Users.username, + Users.permissions, + ) + ) return [map_user(row) for row in result] async def new_user(self, username, permissions, token): - statement = insert(Users).values( - username=username, - permissions=" ".join(permissions), - token=token, - ) - self.logger.debug("%s", statement) try: async with self.db.begin(): - await self.db.execute(statement) + await self._execute( + insert(Users).values( + username=username, + permissions=" ".join(permissions), + token=token, + ) + ) return True except IntegrityError as e: self.logger.debug("Cannot create new user %s: %s", username, e) return False async def delete_user(self, username): - statement = delete(Users).where(Users.username == username) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + delete(Users).where(Users.username == username) + ) return result.rowcount != 0 async def get_usage(self): usage = {} async with self.db.begin() as session: for name, table in Base.metadata.tables.items(): - statement = select(func.count()).select_from(table) - self.logger.debug("%s", statement) - result = await self.db.execute(statement) + result = await self._execute( + statement=select(func.count()).select_from(table) + ) usage[name] = { "rows": result.scalar(), } From patchwork Sun Feb 18 22:59:48 2024 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Joshua Watt X-Patchwork-Id: 39661 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from aws-us-west-2-korg-lkml-1.web.codeaurora.org (localhost.localdomain [127.0.0.1]) by smtp.lore.kernel.org (Postfix) with ESMTP id D8E30C54764 for ; Sun, 18 Feb 2024 23:00:15 +0000 (UTC) Received: from mail-io1-f52.google.com (mail-io1-f52.google.com [209.85.166.52]) by mx.groups.io with SMTP id smtpd.web10.29047.1708297210506627606 for ; Sun, 18 Feb 2024 15:00:10 -0800 Authentication-Results: mx.groups.io; dkim=pass header.i=@gmail.com header.s=20230601 header.b=N+6viJkh; spf=pass (domain: gmail.com, ip: 209.85.166.52, mailfrom: jpewhacker@gmail.com) Received: by mail-io1-f52.google.com with SMTP id ca18e2360f4ac-7beda6a274bso154668339f.2 for ; Sun, 18 Feb 2024 15:00:10 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20230601; t=1708297208; x=1708902008; darn=lists.openembedded.org; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=CtvV3E/YY2tMZ2TEJt6BPtAHOVV5gzzmxAya+OjTTsE=; b=N+6viJkh/xgzjvqYC2rvRMw0DHJFSONpUIYQJ5PdTh3EWpOi5ovTQlzfSWmnrimrvr AZlnBrhRA9PXcNajFeKPNACrUY01RN3Hh9Iuuny2mOYbJ+NQ/KyvUm0ZybUMKjlViXCq mCx9USik3g+t8riDcRznYIOG0aIOhXUhp6hqK/5ZD3wxMApmx9HYHSfVC18kiaiknI3T BncW+Y1fS9TFZG7c7VL4v9X7KNqnIT6O9owG2GzAoEf/9lcqNYxAgxYG9oqc85ulEFK/ 4SDbF3plVa3Sqkscb5TrVTaPSKPyNfITfrPmtW/U1mH1JgeA9fA6M0KxUDPwMfpqChT0 9IiA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1708297208; x=1708902008; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-message-state:from:to:cc :subject:date:message-id:reply-to; bh=CtvV3E/YY2tMZ2TEJt6BPtAHOVV5gzzmxAya+OjTTsE=; b=drS7Nv72xMFWwq/gJrVWC5uXFbpoIhog7AZyd0jnNycDxp9n8HtCWaMjt/NAwSf34S dE7hVj+vQKD1rSEKQpeGtL7Lui4f3TeHtPJ4T+t/bwS1FXGzT2/3H+sgDqsaM+qb5Kg5 Mrx1NIFPTaDa15osOXOSW/40FaGXh85iR5TSH+gtQfXC8XxlIlmlCd2mXEXPuq6mi8vd LsxHnl/JqDSj+8ULozzyEUFeHRB+2igML33kjMmdAhNQDeumctC5iDi7Ax5Ruj2CzStN U2jRQZkNS1YrSIS8Za4/X+aCePPqLFzb5QBUadi/wSyHeXw9lGszOg7ac+j2kvdkfWpB N+CQ== X-Gm-Message-State: AOJu0YyGwljqPk6iRY7oUMjxufCrZMHsPVjkiVgcn8/Ph1/aw2/9phtX iXwIBiiDGQsriQ53rFdqkvCuXjpIuEIKAZPCVr4e6Eg2Syj/lLU01LZIHkIY X-Google-Smtp-Source: AGHT+IEx0OG7MxNksu/NoNuOhfIhKJJueciuyxZethexIgPl0fiJ60h3Rm5Zr/CccUd1I9gnZCFM0w== X-Received: by 2002:a05:6602:2c95:b0:7c7:44e6:9192 with SMTP id i21-20020a0566022c9500b007c744e69192mr3566941iow.16.1708297208138; Sun, 18 Feb 2024 15:00:08 -0800 (PST) Received: from localhost.localdomain ([2601:282:4300:19e0::44fb]) by smtp.gmail.com with ESMTPSA id n3-20020a02cc03000000b004712a778fb4sm1214848jap.28.2024.02.18.15.00.05 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Sun, 18 Feb 2024 15:00:06 -0800 (PST) From: Joshua Watt X-Google-Original-From: Joshua Watt To: bitbake-devel@lists.openembedded.org Cc: Joshua Watt Subject: [bitbake-devel][PATCH v2 3/8] hashserv: Add unihash-exists API Date: Sun, 18 Feb 2024 15:59:48 -0700 Message-Id: <20240218225953.2997239-4-JPEWhacker@gmail.com> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20240218225953.2997239-1-JPEWhacker@gmail.com> References: <20240218200743.2982923-1-JPEWhacker@gmail.com> <20240218225953.2997239-1-JPEWhacker@gmail.com> MIME-Version: 1.0 List-Id: X-Webhook-Received: from li982-79.members.linode.com [45.33.32.79] by aws-us-west-2-korg-lkml-1.web.codeaurora.org with HTTPS for ; Sun, 18 Feb 2024 23:00:15 -0000 X-Groupsio-URL: https://lists.openembedded.org/g/bitbake-devel/message/15925 Adds API to check if the server is aware of the existence of a given unihash. This can be used as an optimization for sstate where a client can query the hash equivalence server to check if a unihash exists before querying the sstate cache. If the hash server isn't aware of the existence of a unihash, then there is very likely not a matching sstate object, so this should be able to significantly cut down on the number of negative hits on the sstate cache. Signed-off-by: Joshua Watt --- bitbake/bin/bitbake-hashclient | 13 +++++++ bitbake/lib/hashserv/client.py | 44 ++++++++++++++++----- bitbake/lib/hashserv/server.py | 61 +++++++++++++++++++----------- bitbake/lib/hashserv/sqlalchemy.py | 11 ++++++ bitbake/lib/hashserv/sqlite.py | 16 ++++++++ bitbake/lib/hashserv/tests.py | 39 +++++++++++++++++++ 6 files changed, 151 insertions(+), 33 deletions(-) diff --git a/bitbake/bin/bitbake-hashclient b/bitbake/bin/bitbake-hashclient index f71b87404ae..47dd27cd3c2 100755 --- a/bitbake/bin/bitbake-hashclient +++ b/bitbake/bin/bitbake-hashclient @@ -217,6 +217,14 @@ def main(): print("Removed %d rows" % result["count"]) return 0 + def handle_unihash_exists(args, client): + result = client.unihash_exists(args.unihash) + if args.quiet: + return 0 if result else 1 + + print("true" if result else "false") + return 0 + parser = argparse.ArgumentParser(description='Hash Equivalence Client') parser.add_argument('--address', default=DEFAULT_ADDRESS, help='Server address (default "%(default)s")') parser.add_argument('--log', default='WARNING', help='Set logging level') @@ -309,6 +317,11 @@ def main(): gc_sweep_parser.add_argument("mark", help="Mark for this garbage collection operation") gc_sweep_parser.set_defaults(func=handle_gc_sweep) + unihash_exists_parser = subparsers.add_parser('unihash-exists', help="Check if a unihash is known to the server") + unihash_exists_parser.add_argument("--quiet", action="store_true", help="Don't print status. Instead, exit with 0 if unihash exists and 1 if it does not") + unihash_exists_parser.add_argument("unihash", help="Unihash to check") + unihash_exists_parser.set_defaults(func=handle_unihash_exists) + args = parser.parse_args() logger = logging.getLogger('hashserv') diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index e6dc4179126..daf1e128423 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py @@ -16,6 +16,7 @@ logger = logging.getLogger("hashserv.client") class AsyncClient(bb.asyncrpc.AsyncClient): MODE_NORMAL = 0 MODE_GET_STREAM = 1 + MODE_EXIST_STREAM = 2 def __init__(self, username=None, password=None): super().__init__("OEHASHEQUIV", "1.1", logger) @@ -49,19 +50,36 @@ class AsyncClient(bb.asyncrpc.AsyncClient): await self.socket.send("END") return await self.socket.recv() - if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM: + async def normal_to_stream(command): + r = await self.invoke({command: None}) + if r != "ok": + raise ConnectionError( + f"Unable to transition to stream mode: Bad response from server {r!r}" + ) + + self.logger.debug("Mode is now %s", command) + + if new_mode == self.mode: + return + + self.logger.debug("Transitioning mode %s -> %s", self.mode, new_mode) + + # Always transition to normal mode before switching to any other mode + if self.mode != self.MODE_NORMAL: r = await self._send_wrapper(stream_to_normal) if r != "ok": self.check_invoke_error(r) - raise ConnectionError("Unable to transition to normal mode: Bad response from server %r" % r) - elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL: - r = await self.invoke({"get-stream": None}) - if r != "ok": - raise ConnectionError("Unable to transition to stream mode: Bad response from server %r" % r) - elif new_mode != self.mode: - raise Exception( - "Undefined mode transition %r -> %r" % (self.mode, new_mode) - ) + raise ConnectionError( + f"Unable to transition to normal mode: Bad response from server {r!r}" + ) + self.logger.debug("Mode is now normal") + + if new_mode == self.MODE_GET_STREAM: + await normal_to_stream("get-stream") + elif new_mode == self.MODE_EXIST_STREAM: + await normal_to_stream("exists-stream") + elif new_mode != self.MODE_NORMAL: + raise Exception("Undefined mode transition {self.mode!r} -> {new_mode!r}") self.mode = new_mode @@ -95,6 +113,11 @@ class AsyncClient(bb.asyncrpc.AsyncClient): {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} ) + async def unihash_exists(self, unihash): + await self._set_mode(self.MODE_EXIST_STREAM) + r = await self.send_stream(unihash) + return r == "true" + async def get_outhash(self, method, outhash, taskhash, with_unihash=True): await self._set_mode(self.MODE_NORMAL) return await self.invoke( @@ -236,6 +259,7 @@ class Client(bb.asyncrpc.Client): "report_unihash", "report_unihash_equiv", "get_taskhash", + "unihash_exists", "get_outhash", "get_stats", "reset_stats", diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index 5ed852d1f30..68f64f983b2 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py @@ -234,6 +234,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): "get": self.handle_get, "get-outhash": self.handle_get_outhash, "get-stream": self.handle_get_stream, + "exists-stream": self.handle_exists_stream, "get-stats": self.handle_get_stats, "get-db-usage": self.handle_get_db_usage, "get-db-query-columns": self.handle_get_db_query_columns, @@ -377,8 +378,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"]) await self.db.insert_outhash(data) - @permissions(READ_PERM) - async def handle_get_stream(self, request): + async def _stream_handler(self, handler): await self.socket.send_message("ok") while True: @@ -400,35 +400,50 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): if l == "END": break - (method, taskhash) = l.split() - # self.logger.debug('Looking up %s %s' % (method, taskhash)) - row = await self.db.get_equivalent(method, taskhash) - - if row is not None: - msg = row["unihash"] - # self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) - elif self.upstream_client is not None: - upstream = await self.upstream_client.get_unihash(method, taskhash) - if upstream: - msg = upstream - else: - msg = "" - else: - msg = "" - + msg = await handler(l) await self.socket.send(msg) finally: request_measure.end() self.request_sample.end() - # Post to the backfill queue after writing the result to minimize - # the turn around time on a request - if upstream is not None: - await self.server.backfill_queue.put((method, taskhash)) - await self.socket.send("ok") return self.NO_RESPONSE + @permissions(READ_PERM) + async def handle_get_stream(self, request): + async def handler(l): + (method, taskhash) = l.split() + # self.logger.debug('Looking up %s %s' % (method, taskhash)) + row = await self.db.get_equivalent(method, taskhash) + + if row is not None: + # self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) + return row["unihash"] + + if self.upstream_client is not None: + upstream = await self.upstream_client.get_unihash(method, taskhash) + if upstream: + await self.server.backfill_queue.put((method, taskhash)) + return upstream + + return "" + + return await self._stream_handler(handler) + + @permissions(READ_PERM) + async def handle_exists_stream(self, request): + async def handler(l): + if await self.db.unihash_exists(l): + return "true" + + if self.upstream_client is not None: + if await self.upstream_client.unihash_exists(l): + return "true" + + return "false" + + return await self._stream_handler(handler) + async def report_readonly(self, data): method = data["method"] outhash = data["outhash"] diff --git a/bitbake/lib/hashserv/sqlalchemy.py b/bitbake/lib/hashserv/sqlalchemy.py index 873547809a0..0e28d738f5a 100644 --- a/bitbake/lib/hashserv/sqlalchemy.py +++ b/bitbake/lib/hashserv/sqlalchemy.py @@ -48,6 +48,7 @@ class UnihashesV3(Base): __table_args__ = ( UniqueConstraint("method", "taskhash"), Index("taskhash_lookup_v4", "method", "taskhash"), + Index("unihash_lookup_v1", "unihash"), ) @@ -279,6 +280,16 @@ class Database(object): ) return map_row(result.first()) + async def unihash_exists(self, unihash): + async with self.db.begin(): + result = await self._execute( + select(UnihashesV3) + .where(UnihashesV3.unihash == unihash) + .limit(1) + ) + + return result.first() is not None + async def get_outhash(self, method, outhash): async with self.db.begin(): result = await self._execute( diff --git a/bitbake/lib/hashserv/sqlite.py b/bitbake/lib/hashserv/sqlite.py index 608490730d7..da2e844a031 100644 --- a/bitbake/lib/hashserv/sqlite.py +++ b/bitbake/lib/hashserv/sqlite.py @@ -144,6 +144,9 @@ class DatabaseEngine(object): cursor.execute( "CREATE INDEX IF NOT EXISTS taskhash_lookup_v4 ON unihashes_v3 (method, taskhash)" ) + cursor.execute( + "CREATE INDEX IF NOT EXISTS unihash_lookup_v1 ON unihashes_v3 (unihash)" + ) cursor.execute( "CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)" ) @@ -255,6 +258,19 @@ class Database(object): ) return cursor.fetchone() + async def unihash_exists(self, unihash): + with closing(self.db.cursor()) as cursor: + cursor.execute( + """ + SELECT * FROM unihashes_v3 WHERE unihash=:unihash + LIMIT 1 + """, + { + "unihash": unihash, + }, + ) + return cursor.fetchone() is not None + async def get_outhash(self, method, outhash): with closing(self.db.cursor()) as cursor: cursor.execute( diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index aeedab3575e..fbbe81512a0 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py @@ -442,6 +442,11 @@ class HashEquivalenceCommonTests(object): self.assertEqual(result['taskhash'], taskhash9, 'Server failed to copy unihash from upstream') self.assertEqual(result['method'], self.METHOD) + def test_unihash_exsits(self): + taskhash, outhash, unihash = self.create_test_hash(self.client) + self.assertTrue(self.client.unihash_exists(unihash)) + self.assertFalse(self.client.unihash_exists('6662e699d6e3d894b24408ff9a4031ef9b038ee8')) + def test_ro_server(self): rw_server = self.start_server() rw_client = self.start_client(rw_server.address) @@ -1031,6 +1036,40 @@ class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase): def test_stress(self): self.run_hashclient(["--address", self.server_address, "stress"], check=True) + def test_unihash_exsits(self): + taskhash, outhash, unihash = self.create_test_hash(self.client) + + p = self.run_hashclient([ + "--address", self.server_address, + "unihash-exists", unihash, + ], check=True) + self.assertEqual(p.stdout.strip(), "true") + + p = self.run_hashclient([ + "--address", self.server_address, + "unihash-exists", '6662e699d6e3d894b24408ff9a4031ef9b038ee8', + ], check=True) + self.assertEqual(p.stdout.strip(), "false") + + def test_unihash_exsits_quiet(self): + taskhash, outhash, unihash = self.create_test_hash(self.client) + + p = self.run_hashclient([ + "--address", self.server_address, + "unihash-exists", unihash, + "--quiet", + ]) + self.assertEqual(p.returncode, 0) + self.assertEqual(p.stdout.strip(), "") + + p = self.run_hashclient([ + "--address", self.server_address, + "unihash-exists", '6662e699d6e3d894b24408ff9a4031ef9b038ee8', + "--quiet", + ]) + self.assertEqual(p.returncode, 1) + self.assertEqual(p.stdout.strip(), "") + def test_remove_taskhash(self): taskhash, outhash, unihash = self.create_test_hash(self.client) self.run_hashclient([ From patchwork Sun Feb 18 22:59:49 2024 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Joshua Watt X-Patchwork-Id: 39663 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from aws-us-west-2-korg-lkml-1.web.codeaurora.org (localhost.localdomain [127.0.0.1]) by smtp.lore.kernel.org (Postfix) with ESMTP id F05D9C5478A for ; Sun, 18 Feb 2024 23:00:15 +0000 (UTC) Received: from mail-io1-f44.google.com (mail-io1-f44.google.com [209.85.166.44]) by mx.groups.io with SMTP id smtpd.web11.29233.1708297211536013677 for ; Sun, 18 Feb 2024 15:00:11 -0800 Authentication-Results: mx.groups.io; dkim=pass header.i=@gmail.com header.s=20230601 header.b=lqtAM3bW; spf=pass (domain: gmail.com, ip: 209.85.166.44, mailfrom: jpewhacker@gmail.com) Received: by mail-io1-f44.google.com with SMTP id ca18e2360f4ac-7bf7e37dc60so231047439f.3 for ; Sun, 18 Feb 2024 15:00:11 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20230601; t=1708297209; x=1708902009; darn=lists.openembedded.org; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=M4D9le2KkjKbIj5/ZeXSLmEDIerOkrycNwUHeOgzjZA=; b=lqtAM3bWqvct8WjykCbwirSg7HpSkgloMXr4IA9tGgLJNnMsTB44QpyNurzElEby+S 6ljx49UjGFdguJDefb4MXH1Z/lSSOroaZxzdIKcpjf5Ml8snGDW3ZEDUPa0bnCrvi3sy b8nlz+oTtsp/dyul/AvF4P1JyRAU3mJXTAgTMieXuwig6uR9f8MTar/WvQ3mxJvHAF8S PWCbWYMUrUbivklfuVy0KJzrBWHB9ft6H5UXBoIFig+FM0LdCDX3w0oQanY+DuKWi4xM sxnqldLxquVlUBKZ+Wt2TzxBh9MO16yp9+vtCcNwkeehyaFM51Ee0OgIHpvAIHCfXW+x 86yQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1708297209; x=1708902009; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-message-state:from:to:cc :subject:date:message-id:reply-to; bh=M4D9le2KkjKbIj5/ZeXSLmEDIerOkrycNwUHeOgzjZA=; b=eyqbBp7C+Kx7ZBqkQ4FPGhNa4yWJaWRI3oZfk75oiYJ8RiMayFSspO2TeT1PQC1fra 2y5u3MfTrvFIhvbr7VKbDiwDkDn/B7ULOATNIsnrYp0+bS2rZGG+Fwj/rm+wmjfoHCLR 5kqyJWvSCJ8m8NycuwHJh8dZqFVZK6tbVcZhu97UAov/w4q9FXKcrbFUe4ERhjYYj5WU 5aZVHoGgin+KAjmTfizIm3ETMVRZE+7L2n2z/qq7aGY6xjci7vZgv92fIo+yYffyTWEl XDZCZpBvNvFlyASxvXeocyq4rqemcNW57kxq5zKKvSWY+D0IynC+yZna8nQofViTPGZL r8fQ== X-Gm-Message-State: AOJu0YzeXjjuiQxa28ptq3exq/LSj27MJ/ve8xvR+PEytHYlPj4w5OTB IgSV7AOkHLqvBAU4FTExpVpHbpomokD7vnfnaP5A3D82oONrp0y72V/MlWop X-Google-Smtp-Source: AGHT+IEtJybtvLabRQusJniYPEBrbBKy0aYfs7QWhotm3BFx/+B6PcaKbr5qjyv5gKxqTy2YX7pTwA== X-Received: by 2002:a5d:8498:0:b0:7c4:9619:a6ce with SMTP id t24-20020a5d8498000000b007c49619a6cemr12640197iom.12.1708297209048; Sun, 18 Feb 2024 15:00:09 -0800 (PST) Received: from localhost.localdomain ([2601:282:4300:19e0::44fb]) by smtp.gmail.com with ESMTPSA id n3-20020a02cc03000000b004712a778fb4sm1214848jap.28.2024.02.18.15.00.08 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Sun, 18 Feb 2024 15:00:08 -0800 (PST) From: Joshua Watt X-Google-Original-From: Joshua Watt To: bitbake-devel@lists.openembedded.org Cc: Joshua Watt Subject: [bitbake-devel][PATCH v2 4/8] asyncrpc: Add Client Pool object Date: Sun, 18 Feb 2024 15:59:49 -0700 Message-Id: <20240218225953.2997239-5-JPEWhacker@gmail.com> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20240218225953.2997239-1-JPEWhacker@gmail.com> References: <20240218200743.2982923-1-JPEWhacker@gmail.com> <20240218225953.2997239-1-JPEWhacker@gmail.com> MIME-Version: 1.0 List-Id: X-Webhook-Received: from li982-79.members.linode.com [45.33.32.79] by aws-us-west-2-korg-lkml-1.web.codeaurora.org with HTTPS for ; Sun, 18 Feb 2024 23:00:15 -0000 X-Groupsio-URL: https://lists.openembedded.org/g/bitbake-devel/message/15926 Adds an abstract base class that can be used to implement a pool of client connections. The class implements a thread that runs an async event loop, and allows derived classes to schedule work on the loop and wait for the work to be finished. Signed-off-by: Joshua Watt --- bitbake/lib/bb/asyncrpc/__init__.py | 2 +- bitbake/lib/bb/asyncrpc/client.py | 77 +++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 1 deletion(-) diff --git a/bitbake/lib/bb/asyncrpc/__init__.py b/bitbake/lib/bb/asyncrpc/__init__.py index a4371643d74..639e1607f8e 100644 --- a/bitbake/lib/bb/asyncrpc/__init__.py +++ b/bitbake/lib/bb/asyncrpc/__init__.py @@ -5,7 +5,7 @@ # -from .client import AsyncClient, Client +from .client import AsyncClient, Client, ClientPool from .serv import AsyncServer, AsyncServerConnection from .connection import DEFAULT_MAX_CHUNK from .exceptions import ( diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py index 0d7cd85780d..a6228bb0ba0 100644 --- a/bitbake/lib/bb/asyncrpc/client.py +++ b/bitbake/lib/bb/asyncrpc/client.py @@ -10,6 +10,8 @@ import json import os import socket import sys +import contextlib +from threading import Thread from .connection import StreamConnection, WebsocketConnection, DEFAULT_MAX_CHUNK from .exceptions import ConnectionClosedError, InvokeError @@ -180,3 +182,78 @@ class Client(object): def __exit__(self, exc_type, exc_value, traceback): self.close() return False + + +class ClientPool(object): + def __init__(self, max_clients): + self.avail_clients = [] + self.num_clients = 0 + self.max_clients = max_clients + self.loop = None + self.client_condition = None + + @abc.abstractmethod + async def _new_client(self): + raise NotImplementedError("Must be implemented in derived class") + + def close(self): + if self.client_condition: + self.client_condition = None + + if self.loop: + self.loop.run_until_complete(self.__close_clients()) + self.loop.run_until_complete(self.loop.shutdown_asyncgens()) + self.loop.close() + self.loop = None + + def run_tasks(self, tasks): + if not self.loop: + self.loop = asyncio.new_event_loop() + + thread = Thread(target=self.__thread_main, args=(tasks,)) + thread.start() + thread.join() + + @contextlib.asynccontextmanager + async def get_client(self): + async with self.client_condition: + if self.avail_clients: + client = self.avail_clients.pop() + elif self.num_clients < self.max_clients: + self.num_clients += 1 + client = await self._new_client() + else: + while not self.avail_clients: + await self.client_condition.wait() + client = self.avail_clients.pop() + + try: + yield client + finally: + async with self.client_condition: + self.avail_clients.append(client) + self.client_condition.notify() + + def __thread_main(self, tasks): + async def process_task(task): + async with self.get_client() as client: + await task(client) + + asyncio.set_event_loop(self.loop) + if not self.client_condition: + self.client_condition = asyncio.Condition() + tasks = [process_task(t) for t in tasks] + self.loop.run_until_complete(asyncio.gather(*tasks)) + + async def __close_clients(self): + for c in self.avail_clients: + await c.close() + self.avail_clients = [] + self.num_clients = 0 + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + return False From patchwork Sun Feb 18 22:59:50 2024 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Joshua Watt X-Patchwork-Id: 39659 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from aws-us-west-2-korg-lkml-1.web.codeaurora.org (localhost.localdomain [127.0.0.1]) by smtp.lore.kernel.org (Postfix) with ESMTP id BCFABC48BF6 for ; Sun, 18 Feb 2024 23:00:15 +0000 (UTC) Received: from mail-io1-f42.google.com (mail-io1-f42.google.com [209.85.166.42]) by mx.groups.io with SMTP id smtpd.web11.29234.1708297213460648108 for ; Sun, 18 Feb 2024 15:00:13 -0800 Authentication-Results: mx.groups.io; dkim=pass header.i=@gmail.com header.s=20230601 header.b=S2WhcEjf; spf=pass (domain: gmail.com, ip: 209.85.166.42, mailfrom: jpewhacker@gmail.com) Received: by mail-io1-f42.google.com with SMTP id ca18e2360f4ac-7c7476a11e0so41952039f.3 for ; Sun, 18 Feb 2024 15:00:13 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20230601; t=1708297211; x=1708902011; darn=lists.openembedded.org; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=NjgoUuru/5mh1737D5tbdwzUctpUmZzvxjF8D9ZkMhc=; b=S2WhcEjfg9OD3g4rAInO6/2uGVy37waFbVhxTr5cFCUg7V2QQxbcAH+WbgoBlLlo98 KUlEYK5KTMrvxe8wgVtidBbi7JtxM1R8hvYVoYKFg8WoU94OierSf4NrUYoEPqa6Jjvn Zsh81VPvP9mhDZUD9Rk5L0dVixTmAv2Bq93ExhPJNfeePLSVsJBaYUAOY1AAp/CTg8Gn pYDbYEtJ8dW6LQ/auN4a9m8YZsNcMk8M/6X8V0iPdfzYB9HpdtSHhKh1SxG8ipNYkd+n IM7SDI6CtI1PMZmLaLWUDTcQc0Sb+BnOr11HgOkvTIIwV4JHUaNV3T4tl+SyQIDueei6 Il/A== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1708297211; x=1708902011; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-message-state:from:to:cc :subject:date:message-id:reply-to; bh=NjgoUuru/5mh1737D5tbdwzUctpUmZzvxjF8D9ZkMhc=; b=pg2YeYN/1DeEk4QNzCaAL+oPF3FPe70rDewwO7oyy4pkwd/5mlhD9E1Xpf594JvzJ4 c6tqFg7G0UNqcw9o9OZtFf52mZ7MYiSqVqa12tFSjrwrZwHAnOmib/8k9MT721Z51O+6 EorLjvvWWe5FDer5GwZt1H2etKXGKUhFw08HAjI+HkoKQFSZX5vxQX1DOwMq05NY9ZQ2 d2MfwCN44iVkiH1I/cO1AgqcLpvnrumZbUCL5ACxwIyOQ2oWWKbAokgLlecN8V1Sb1wR RPEMxWyfRnKF/ELwtEaHAIOlzbb/mFsgkljxe36z5TpqA886e1I10s4lASkkAcEhVv6V g7UA== X-Gm-Message-State: AOJu0YwB1DveGu+XzOCtE9bR1N8t/ftVw6Px0ymoIneA/yb2/nd21/K1 Ss8vTAxAvxPMNXY+zsOKFFd+V+G457JETqGahF/KsyBCSw4i9Vjforsp2SLD X-Google-Smtp-Source: AGHT+IGNhBiVnW/t+0q2tqrP9wXOfUTS4tuEu7I4fNQ5Ppt8Wcpw8ZBRoANv6FHovLrGx52TKsEx2A== X-Received: by 2002:a5d:9951:0:b0:7c7:234e:61b4 with SMTP id v17-20020a5d9951000000b007c7234e61b4mr6530461ios.18.1708297211687; Sun, 18 Feb 2024 15:00:11 -0800 (PST) Received: from localhost.localdomain ([2601:282:4300:19e0::44fb]) by smtp.gmail.com with ESMTPSA id n3-20020a02cc03000000b004712a778fb4sm1214848jap.28.2024.02.18.15.00.09 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Sun, 18 Feb 2024 15:00:09 -0800 (PST) From: Joshua Watt X-Google-Original-From: Joshua Watt To: bitbake-devel@lists.openembedded.org Cc: Joshua Watt Subject: [bitbake-devel][PATCH v2 5/8] hashserv: Add Client Pool Date: Sun, 18 Feb 2024 15:59:50 -0700 Message-Id: <20240218225953.2997239-6-JPEWhacker@gmail.com> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20240218225953.2997239-1-JPEWhacker@gmail.com> References: <20240218200743.2982923-1-JPEWhacker@gmail.com> <20240218225953.2997239-1-JPEWhacker@gmail.com> MIME-Version: 1.0 List-Id: X-Webhook-Received: from li982-79.members.linode.com [45.33.32.79] by aws-us-west-2-korg-lkml-1.web.codeaurora.org with HTTPS for ; Sun, 18 Feb 2024 23:00:15 -0000 X-Groupsio-URL: https://lists.openembedded.org/g/bitbake-devel/message/15927 Implements a Client Pool derived from the AsyncRPC client pool that allows querying for multiple equivalent hashes in parallel Signed-off-by: Joshua Watt --- bitbake/lib/hashserv/client.py | 80 ++++++++++++++++++++++++++++++++ bitbake/lib/hashserv/tests.py | 83 ++++++++++++++++++++++++++++++++++ 2 files changed, 163 insertions(+) diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index daf1e128423..b269879ecfd 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py @@ -283,3 +283,83 @@ class Client(bb.asyncrpc.Client): def _get_async_client(self): return AsyncClient(self.username, self.password) + + +class ClientPool(bb.asyncrpc.ClientPool): + def __init__( + self, + address, + max_clients, + *, + username=None, + password=None, + become=None, + ): + super().__init__(max_clients) + self.address = address + self.username = username + self.password = password + self.become = become + + async def _new_client(self): + client = await create_async_client( + self.address, + username=self.username, + password=self.password, + ) + if self.become: + await client.become_user(self.become) + return client + + def _run_key_tasks(self, queries, call): + results = {key: None for key in queries.keys()} + + def make_task(key, args): + async def task(client): + nonlocal results + unihash = await call(client, args) + results[key] = unihash + + return task + + def gen_tasks(): + for key, args in queries.items(): + yield make_task(key, args) + + self.run_tasks(gen_tasks()) + return results + + def get_unihashes(self, queries): + """ + Query multiple unihashes in parallel. + + The queries argument is a dictionary with arbitrary key. The values + must be a tuple of (method, taskhash). + + Returns a dictionary with a corresponding key for each input key, and + the value is the queried unihash (which might be none if the query + failed) + """ + + async def call(client, args): + method, taskhash = args + return await client.get_unihash(method, taskhash) + + return self._run_key_tasks(queries, call) + + def unihashes_exist(self, queries): + """ + Query multiple unihash existence checks in parallel. + + The queries argument is a dictionary with arbitrary key. The values + must be a unihash. + + Returns a dictionary with a corresponding key for each input key, and + the value is True or False if the unihash is known by the server (or + None if there was a failure) + """ + + async def call(client, unihash): + return await client.unihash_exists(unihash) + + return self._run_key_tasks(queries, call) diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index fbbe81512a0..0809453cf87 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py @@ -8,6 +8,7 @@ from . import create_server, create_client from .server import DEFAULT_ANON_PERMS, ALL_PERMISSIONS from bb.asyncrpc import InvokeError +from .client import ClientPool import hashlib import logging import multiprocessing @@ -554,6 +555,88 @@ class HashEquivalenceCommonTests(object): # shares a taskhash with Task 2 self.assertClientGetHash(self.client, taskhash2, unihash2) + + def test_client_pool_get_unihashes(self): + TEST_INPUT = ( + # taskhash outhash unihash + ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), + # Duplicated taskhash with multiple output hashes and unihashes. + ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', 'ae9a7d252735f0dafcdb10e2e02561ca3a47314c'), + # Equivalent hash + ("044c2ec8aaf480685a00ff6ff49e6162e6ad34e1", '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', "def64766090d28f627e816454ed46894bb3aab36"), + ("e3da00593d6a7fb435c7e2114976c59c5fd6d561", "1cf8713e645f491eb9c959d20b5cae1c47133a292626dda9b10709857cbe688a", "3b5d3d83f07f259e9086fcb422c855286e18a57d"), + ('35788efcb8dfb0a02659d81cf2bfd695fb30faf9', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2cd'), + ('35788efcb8dfb0a02659d81cf2bfd695fb30fafa', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2ce'), + ('9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74', '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69', '05d2a63c81e32f0a36542ca677e8ad852365c538'), + ) + EXTRA_QUERIES = ( + "6b6be7a84ab179b4240c4302518dc3f6", + ) + + with ClientPool(self.server_address, 10) as client_pool: + for taskhash, outhash, unihash in TEST_INPUT: + self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) + + query = {idx: (self.METHOD, data[0]) for idx, data in enumerate(TEST_INPUT)} + for idx, taskhash in enumerate(EXTRA_QUERIES): + query[idx + len(TEST_INPUT)] = (self.METHOD, taskhash) + + result = client_pool.get_unihashes(query) + + self.assertDictEqual(result, { + 0: "218e57509998197d570e2c98512d0105985dffc9", + 1: "218e57509998197d570e2c98512d0105985dffc9", + 2: "218e57509998197d570e2c98512d0105985dffc9", + 3: "3b5d3d83f07f259e9086fcb422c855286e18a57d", + 4: "f46d3fbb439bd9b921095da657a4de906510d2cd", + 5: "f46d3fbb439bd9b921095da657a4de906510d2cd", + 6: "05d2a63c81e32f0a36542ca677e8ad852365c538", + 7: None, + }) + + def test_client_pool_unihash_exists(self): + TEST_INPUT = ( + # taskhash outhash unihash + ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), + # Duplicated taskhash with multiple output hashes and unihashes. + ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', 'ae9a7d252735f0dafcdb10e2e02561ca3a47314c'), + # Equivalent hash + ("044c2ec8aaf480685a00ff6ff49e6162e6ad34e1", '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', "def64766090d28f627e816454ed46894bb3aab36"), + ("e3da00593d6a7fb435c7e2114976c59c5fd6d561", "1cf8713e645f491eb9c959d20b5cae1c47133a292626dda9b10709857cbe688a", "3b5d3d83f07f259e9086fcb422c855286e18a57d"), + ('35788efcb8dfb0a02659d81cf2bfd695fb30faf9', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2cd'), + ('35788efcb8dfb0a02659d81cf2bfd695fb30fafa', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2ce'), + ('9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74', '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69', '05d2a63c81e32f0a36542ca677e8ad852365c538'), + ) + EXTRA_QUERIES = ( + "6b6be7a84ab179b4240c4302518dc3f6", + ) + + result_unihashes = set() + + + with ClientPool(self.server_address, 10) as client_pool: + for taskhash, outhash, unihash in TEST_INPUT: + result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) + result_unihashes.add(result["unihash"]) + + query = {} + expected = {} + + for _, _, unihash in TEST_INPUT: + idx = len(query) + query[idx] = unihash + expected[idx] = unihash in result_unihashes + + + for unihash in EXTRA_QUERIES: + idx = len(query) + query[idx] = unihash + expected[idx] = False + + result = client_pool.unihashes_exist(query) + self.assertDictEqual(result, expected) + + def test_auth_read_perms(self): admin_client = self.start_auth_server() From patchwork Sun Feb 18 22:59:51 2024 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Joshua Watt X-Patchwork-Id: 39662 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from aws-us-west-2-korg-lkml-1.web.codeaurora.org (localhost.localdomain [127.0.0.1]) by smtp.lore.kernel.org (Postfix) with ESMTP id EF50DC54787 for ; Sun, 18 Feb 2024 23:00:15 +0000 (UTC) Received: from mail-il1-f174.google.com (mail-il1-f174.google.com [209.85.166.174]) by mx.groups.io with SMTP id smtpd.web11.29236.1708297215569659425 for ; Sun, 18 Feb 2024 15:00:15 -0800 Authentication-Results: mx.groups.io; dkim=pass header.i=@gmail.com header.s=20230601 header.b=j87b4voC; spf=pass (domain: gmail.com, ip: 209.85.166.174, mailfrom: jpewhacker@gmail.com) Received: by mail-il1-f174.google.com with SMTP id e9e14a558f8ab-36519980c04so779085ab.3 for ; Sun, 18 Feb 2024 15:00:15 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20230601; t=1708297214; x=1708902014; darn=lists.openembedded.org; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=IGm+CR07IVTaqD5fHQ5Z3Ej7Lcj09MmUmOV6eyrsNLo=; b=j87b4voCBzYCjfcyCaqMm/JzJfbhZuTT8dKYYn12SSfmh/3ozXtBQUklyq8c1Ik8aP aPeJDVsRHzX9rPVM+TUm9qQ1l54zHHgZLEZcc6gf19/YTJxn7C7VZtez1nJd3eJKqqBJ XNjfZ5AhPlfr8SSA7nsAIZe6w3hFivY6bzQmjSmDZg9piGUredtgy9WA5anMig/Kz6Kj ugM9KqIQy/Y11GOGTNMtebnpl8wjXiNvsIPOv3dS/zSTfSGCydiVbbb68qpDqqXmt7uh E3JtaKQEP7tUM++HceaSKT0d8BhoWTHVMmfXBsUryGg+JlZdMZBJ8LvA9k/vjkDygjNK u9Cg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1708297214; x=1708902014; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-message-state:from:to:cc :subject:date:message-id:reply-to; bh=IGm+CR07IVTaqD5fHQ5Z3Ej7Lcj09MmUmOV6eyrsNLo=; b=vJMMrXcbHB3bq7egHPXD5o3YtTVxwCavt5TN1slbUZCqgAfyaqXafKFEvFk00whX0w /Mr5vfD4y7bzX9OVoltiNU9U7jas556ooqA8GdcMdHHLUJlBXBBI1W5lsS6rVT0ac1fp BdwIZATC3XN+k0QoKYXfMdtknq4DGyE2Y/iPwBT/KGCctRhI8BhGig8sM7oif+Ry6gZN cP9w0olQzdwTzcewwNs1J2G89H/NjU8uBTuUzYxbOsFgggt9n/RbwELgdoVXWdieTHyM NfdhEh4lh5EOlG6ewwmS1LBAX6n4z4B04aJ5n6heqtsiVELwLHThKKKW/szrrBKXPm/4 WzOw== X-Gm-Message-State: AOJu0YzGv03QbgxcUgsN+0V+n6YTLEeZ2ToNnR3/1UFSRX2lZPVTGTUR MrR56mMiV/525zdpjYrZARtfexRotT7vBWd7/qxdFrxQfSXisY44QLloeT6c X-Google-Smtp-Source: AGHT+IGIzzmU4pzcYUEfejWKrMSATXGTapUNKUuLhthjSay7j4qR2qWbxywV0SN/z6ZLgYiFVwWG/g== X-Received: by 2002:a92:b748:0:b0:365:d80:e1b2 with SMTP id c8-20020a92b748000000b003650d80e1b2mr7291403ilm.17.1708297213615; Sun, 18 Feb 2024 15:00:13 -0800 (PST) Received: from localhost.localdomain ([2601:282:4300:19e0::44fb]) by smtp.gmail.com with ESMTPSA id n3-20020a02cc03000000b004712a778fb4sm1214848jap.28.2024.02.18.15.00.11 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Sun, 18 Feb 2024 15:00:12 -0800 (PST) From: Joshua Watt X-Google-Original-From: Joshua Watt To: bitbake-devel@lists.openembedded.org Cc: Joshua Watt Subject: [bitbake-devel][PATCH v2 6/8] siggen: Add parallel query API Date: Sun, 18 Feb 2024 15:59:51 -0700 Message-Id: <20240218225953.2997239-7-JPEWhacker@gmail.com> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20240218225953.2997239-1-JPEWhacker@gmail.com> References: <20240218200743.2982923-1-JPEWhacker@gmail.com> <20240218225953.2997239-1-JPEWhacker@gmail.com> MIME-Version: 1.0 List-Id: X-Webhook-Received: from li982-79.members.linode.com [45.33.32.79] by aws-us-west-2-korg-lkml-1.web.codeaurora.org with HTTPS for ; Sun, 18 Feb 2024 23:00:15 -0000 X-Groupsio-URL: https://lists.openembedded.org/g/bitbake-devel/message/15928 Implements a new API called get_unihashes() that allows for querying multiple unihashes in parallel. The API is also reworked to make it easier for derived classes to interface with the new API in a consistent manner. Instead of overriding get_unihash() to add custom handling for local hash calculating (e.g. caches) derived classes should now override get_cached_unihash(), and return the local unihash or None if there isn't one. Signed-off-by: Joshua Watt --- bitbake/lib/bb/siggen.py | 121 ++++++++++++++++++++++++++++----------- 1 file changed, 87 insertions(+), 34 deletions(-) diff --git a/bitbake/lib/bb/siggen.py b/bitbake/lib/bb/siggen.py index 58854aee76c..e1a4fa2cdd1 100644 --- a/bitbake/lib/bb/siggen.py +++ b/bitbake/lib/bb/siggen.py @@ -102,9 +102,18 @@ class SignatureGenerator(object): if flag: self.datacaches[mc].stamp_extrainfo[mcfn][t] = flag + def get_cached_unihash(self, tid): + return None + def get_unihash(self, tid): + unihash = self.get_cached_unihash(tid) + if unihash: + return unihash return self.taskhash[tid] + def get_unihashes(self, tids): + return {tid: self.get_unihash(tid) for tid in tids} + def prep_taskhash(self, tid, deps, dataCaches): return @@ -524,28 +533,37 @@ class SignatureGeneratorUniHashMixIn(object): super().__init__(data) def get_taskdata(self): - return (self.server, self.method, self.extramethod) + super().get_taskdata() + return (self.server, self.method, self.extramethod, self.max_parallel) + super().get_taskdata() def set_taskdata(self, data): - self.server, self.method, self.extramethod = data[:3] - super().set_taskdata(data[3:]) + self.server, self.method, self.extramethod, self.max_parallel = data[:4] + super().set_taskdata(data[4:]) def client(self): if getattr(self, '_client', None) is None: self._client = hashserv.create_client(self.server) return self._client + def client_pool(self): + if getattr(self, '_client_pool', None) is None: + self._client_pool = hashserv.client.ClientPool(self.server, self.max_parallel) + return self._client_pool + def reset(self, data): - if getattr(self, '_client', None) is not None: - self._client.close() - self._client = None + self.__close_clients() return super().reset(data) def exit(self): + self.__close_clients() + return super().exit() + + def __close_clients(self): if getattr(self, '_client', None) is not None: self._client.close() self._client = None - return super().exit() + if getattr(self, '_client_pool', None) is not None: + self._client_pool.close() + self._client_pool = None def get_stampfile_hash(self, tid): if tid in self.taskhash: @@ -578,7 +596,7 @@ class SignatureGeneratorUniHashMixIn(object): return None return unihash - def get_unihash(self, tid): + def get_cached_unihash(self, tid): taskhash = self.taskhash[tid] # If its not a setscene task we can return @@ -593,40 +611,74 @@ class SignatureGeneratorUniHashMixIn(object): self.unihash[tid] = unihash return unihash - # In the absence of being able to discover a unique hash from the - # server, make it be equivalent to the taskhash. The unique "hash" only - # really needs to be a unique string (not even necessarily a hash), but - # making it match the taskhash has a few advantages: - # - # 1) All of the sstate code that assumes hashes can be the same - # 2) It provides maximal compatibility with builders that don't use - # an equivalency server - # 3) The value is easy for multiple independent builders to derive the - # same unique hash from the same input. This means that if the - # independent builders find the same taskhash, but it isn't reported - # to the server, there is a better chance that they will agree on - # the unique hash. - unihash = taskhash + return None - try: - method = self.method - if tid in self.extramethod: - method = method + self.extramethod[tid] - data = self.client().get_unihash(method, self.taskhash[tid]) - if data: - unihash = data + def _get_method(self, tid): + method = self.method + if tid in self.extramethod: + method = method + self.extramethod[tid] + + return method + + def get_unihash(self, tid): + return self.get_unihashes([tid])[tid] + + def get_unihashes(self, tids): + """ + For a iterable of tids, returns a dictionary that maps each tid to a + unihash + """ + result = {} + queries = {} + query_result = {} + + for tid in tids: + unihash = self.get_cached_unihash(tid) + if unihash: + result[tid] = unihash + else: + queries[tid] = (self._get_method(tid), self.taskhash[tid]) + + if len(queries) == 0: + return result + + if self.max_parallel <= 1 or len(queries) <= 1: + # No parallelism required. Make the query serially with the single client + for tid, args in queries.items(): + query_result[tid] = self.client().get_unihash(*args) + else: + query_result = self.client_pool().get_unihashes(queries) + + for tid, unihash in query_result.items(): + # In the absence of being able to discover a unique hash from the + # server, make it be equivalent to the taskhash. The unique "hash" only + # really needs to be a unique string (not even necessarily a hash), but + # making it match the taskhash has a few advantages: + # + # 1) All of the sstate code that assumes hashes can be the same + # 2) It provides maximal compatibility with builders that don't use + # an equivalency server + # 3) The value is easy for multiple independent builders to derive the + # same unique hash from the same input. This means that if the + # independent builders find the same taskhash, but it isn't reported + # to the server, there is a better chance that they will agree on + # the unique hash. + taskhash = self.taskhash[tid] + if unihash: # A unique hash equal to the taskhash is not very interesting, # so it is reported it at debug level 2. If they differ, that # is much more interesting, so it is reported at debug level 1 hashequiv_logger.bbdebug((1, 2)[unihash == taskhash], 'Found unihash %s in place of %s for %s from %s' % (unihash, taskhash, tid, self.server)) else: hashequiv_logger.debug2('No reported unihash for %s:%s from %s' % (tid, taskhash, self.server)) - except ConnectionError as e: - bb.warn('Error contacting Hash Equivalence Server %s: %s' % (self.server, str(e))) + unihash = taskhash - self.set_unihash(tid, unihash) - self.unihash[tid] = unihash - return unihash + + self.set_unihash(tid, unihash) + self.unihash[tid] = unihash + result[tid] = unihash + + return result def report_unihash(self, path, task, d): import importlib @@ -754,6 +806,7 @@ class SignatureGeneratorTestEquivHash(SignatureGeneratorUniHashMixIn, SignatureG super().init_rundepcheck(data) self.server = data.getVar('BB_HASHSERVE') self.method = "sstate_output_hash" + self.max_parallel = 1 def clean_checksum_file_path(file_checksum_tuple): f, cs = file_checksum_tuple From patchwork Sun Feb 18 22:59:52 2024 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Joshua Watt X-Patchwork-Id: 39665 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from aws-us-west-2-korg-lkml-1.web.codeaurora.org (localhost.localdomain [127.0.0.1]) by smtp.lore.kernel.org (Postfix) with ESMTP id EBD61C48BC4 for ; Sun, 18 Feb 2024 23:00:25 +0000 (UTC) Received: from mail-io1-f44.google.com (mail-io1-f44.google.com [209.85.166.44]) by mx.groups.io with SMTP id smtpd.web10.29052.1708297217529528527 for ; Sun, 18 Feb 2024 15:00:17 -0800 Authentication-Results: mx.groups.io; dkim=pass header.i=@gmail.com header.s=20230601 header.b=SLghGPY9; spf=pass (domain: gmail.com, ip: 209.85.166.44, mailfrom: jpewhacker@gmail.com) Received: by mail-io1-f44.google.com with SMTP id ca18e2360f4ac-7c72294e3d1so131437439f.1 for ; Sun, 18 Feb 2024 15:00:17 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20230601; t=1708297216; x=1708902016; darn=lists.openembedded.org; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=WHn2D0/VtGPjS10/2+oeAuTdZ83vrklXRpiSkiY31og=; b=SLghGPY9KZT+7EOOEBJZ5S1PkLRWyETbB/tF+zaJ5ruav6SZds1YsB2V1XoXKcIXfd FnM14MYUoc9uQkOhQULcnI9MNHcuxZyhT5uPQVCJ0DCHhG8f1DVKyrrVZPXdV0tKHHm2 MR7m9yrlEoJpBLEvQb5jerbmjTXpIoQSRGJSqDWD8QYMZ8cNPydTC6j8FHGtmNyERtnf zjgosLj7RMmkkAa9UPQjz78ofbK2At6wN58WK8xjgMdrMfTlOQTM5tZlcRIy+SEBrSZn hQMXDRra/BSQfFTZCC14L6raReZv5b/Yv5d/Hv3Y2cf6tbAE9oPnJi80OhRjZh5Xy8M+ 5uPw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1708297216; x=1708902016; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-message-state:from:to:cc :subject:date:message-id:reply-to; bh=WHn2D0/VtGPjS10/2+oeAuTdZ83vrklXRpiSkiY31og=; b=qYZhZWCMSptPR68FXzS4sRgDWZGayv5kPGHVAoEbdOEHc2yxpAmwBbhkmrtNus6kfj 4jedlTA6Q/QEDU7EHTjlR9C+MxKOjzfnYumx+FgNKV7MVat3JMBCfL04GUEgKFO9RHv7 iiJJLODzUpzQxLzSjDr65n4bsxKyx9DBuM+eRJntohe+VD7n4LA7je90mdWBbMdsxmi3 LtXFopJmJ0dFKMuvpEzYNUqK+gg7mQadogyFDZ2kpx0Uw7hBz0qhK5VjukwlAs8Tlrkp Y6wppp+oo7UJIhgWzltRM6FyJWeB5BMoGiLJ+lL3N59TcpaObmmWYOnmHsLEKzLTATDF pSkQ== X-Gm-Message-State: AOJu0Yy07Xmw2p+14GRTSr9Nlz+fSgOVg66hHfL3TPN1lHo0xC+g+3sb 22NVB1KuN4fRMUJbtY69b//PVcXzFo9Aid3sUwLGkeKD3q1176m2us0SLvBA X-Google-Smtp-Source: AGHT+IEBqyQn6rhU7G0XlUu0e/tZdmIF9nNiiIPxlg9mtznpPw5qSaFjmvbekSKPcPXEeWCFNCG/lA== X-Received: by 2002:a5e:9901:0:b0:7c4:7a26:60fd with SMTP id t1-20020a5e9901000000b007c47a2660fdmr12278027ioj.16.1708297215746; Sun, 18 Feb 2024 15:00:15 -0800 (PST) Received: from localhost.localdomain ([2601:282:4300:19e0::44fb]) by smtp.gmail.com with ESMTPSA id n3-20020a02cc03000000b004712a778fb4sm1214848jap.28.2024.02.18.15.00.13 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Sun, 18 Feb 2024 15:00:14 -0800 (PST) From: Joshua Watt X-Google-Original-From: Joshua Watt To: bitbake-devel@lists.openembedded.org Cc: Joshua Watt Subject: [bitbake-devel][PATCH v2 7/8] siggen: Add parallel unihash exist API Date: Sun, 18 Feb 2024 15:59:52 -0700 Message-Id: <20240218225953.2997239-8-JPEWhacker@gmail.com> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20240218225953.2997239-1-JPEWhacker@gmail.com> References: <20240218200743.2982923-1-JPEWhacker@gmail.com> <20240218225953.2997239-1-JPEWhacker@gmail.com> MIME-Version: 1.0 List-Id: X-Webhook-Received: from li982-79.members.linode.com [45.33.32.79] by aws-us-west-2-korg-lkml-1.web.codeaurora.org with HTTPS for ; Sun, 18 Feb 2024 23:00:25 -0000 X-Groupsio-URL: https://lists.openembedded.org/g/bitbake-devel/message/15929 Adds API to query if unihashes are known to the server in parallel Signed-off-by: Joshua Watt --- bitbake/lib/bb/siggen.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/bitbake/lib/bb/siggen.py b/bitbake/lib/bb/siggen.py index e1a4fa2cdd1..3ab8431a089 100644 --- a/bitbake/lib/bb/siggen.py +++ b/bitbake/lib/bb/siggen.py @@ -530,6 +530,11 @@ class SignatureGeneratorBasicHash(SignatureGeneratorBasic): class SignatureGeneratorUniHashMixIn(object): def __init__(self, data): self.extramethod = {} + # NOTE: The cache only tracks hashes that exist. Hashes that don't + # exist are always queries from the server since it is possible for + # hashes to appear over time, but much less likely for them to + # disappear + self.unihash_exists_cache = set() super().__init__(data) def get_taskdata(self): @@ -620,6 +625,33 @@ class SignatureGeneratorUniHashMixIn(object): return method + def unihashes_exist(self, query): + if len(query) == 0: + return {} + + uncached_query = {} + result = {} + for key, unihash in query.items(): + if unihash in self.unihash_exists_cache: + result[key] = True + else: + uncached_query[key] = unihash + + if self.max_parallel <= 1 or len(uncached_query) <= 1: + # No parallelism required. Make the query serially with the single client + uncached_result = { + key: self.client().unihash_exists(value) for key, value in uncached_query.items() + } + else: + uncached_result = self.client_pool().unihashes_exist(uncached_query) + + for key, exists in uncached_result.items(): + if exists: + self.unihash_exists_cache.add(query[key]) + result[key] = exists + + return result + def get_unihash(self, tid): return self.get_unihashes([tid])[tid] From patchwork Sun Feb 18 22:59:53 2024 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Joshua Watt X-Patchwork-Id: 39666 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from aws-us-west-2-korg-lkml-1.web.codeaurora.org (localhost.localdomain [127.0.0.1]) by smtp.lore.kernel.org (Postfix) with ESMTP id ECACEC54764 for ; Sun, 18 Feb 2024 23:00:25 +0000 (UTC) Received: from mail-io1-f53.google.com (mail-io1-f53.google.com [209.85.166.53]) by mx.groups.io with SMTP id smtpd.web11.29240.1708297220867705602 for ; Sun, 18 Feb 2024 15:00:20 -0800 Authentication-Results: mx.groups.io; dkim=pass header.i=@gmail.com header.s=20230601 header.b=BL1KjL+h; spf=pass (domain: gmail.com, ip: 209.85.166.53, mailfrom: jpewhacker@gmail.com) Received: by mail-io1-f53.google.com with SMTP id ca18e2360f4ac-7c490498729so196268039f.1 for ; Sun, 18 Feb 2024 15:00:20 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20230601; t=1708297219; x=1708902019; darn=lists.openembedded.org; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=ViAjo++gezT/3CcFCrEr+3T0+/E/MsB50nX8c9ej3uE=; b=BL1KjL+h5LyHQ1UQeYxjqBNKZCUDiZLjuiZxIIF7/nZLMyrgRabVbwFN5651jfT33w yCp2Gzxlsqbyxrt3ekXCHc2AJNRikTSMqmafzjrkiO0ye6UP87f7ZrHGrWSRN1LUyNvB yI0ILjycxQerWOO7pIpkRqwflz0ijeI7LV4ualFx93ruT4lEPBb9g+QndjOzNq2z2wID yJ/jkOjDVkH7iHTAd20GQL2kgjVKxNH0D9oITASruM7Y8e97elwbR3a70wwFh42/o7Ot qL6fbx1YayIhd906LVgY0FJD9mv/Asy4Mj/lHfrGWkd8XB8ImS8Rn7Ew7uapzyg8/TzE pX9g== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1708297219; x=1708902019; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-message-state:from:to:cc :subject:date:message-id:reply-to; bh=ViAjo++gezT/3CcFCrEr+3T0+/E/MsB50nX8c9ej3uE=; b=vRr6rupyyu+LKIY/MRkbEvBqq9YkRTc2fUYuKZNKiYAJXKdPWrHZaczpKsRonnlBon dNmJiCb+tybeY1lj206W6VcEXuJV53bEwfsOLKsvZDfG3rBWuXY9kLAlHDl/Etq0fyAh f+8kKPnXHm7FPUah+arh3dCv2SOrkRx/KwN8xDEwXbI8z3/4TUfFNn7j464/Dt4PRdim MV4U9utu0gYoZDbk6mrHhZr3IZs2dyq6AGoQZtDw1MRaG0HKKydbRdWzjQLjXsdjZ327 oAtjST/MxU8omo2YeQ8Hz8dF+wDnXj28YLCqq/pthAy/MZeE3WUtYehatd9T7EiDlmMY xoMg== X-Gm-Message-State: AOJu0YwpL5y4u4ftRKf+ptzMwLCnvTFUFPMy2xE2pvrmg5mfN4pOwOFQ 02zdnQRDiqMhbABLKM+LWDCKyCd2gemTrbfstkYInBL45XjBHnHY0Ls9zEwY X-Google-Smtp-Source: AGHT+IGfpEhnl0ZG7SEcJvoshhDycjMf3X/iqd1SuwZi0OqCOT89jb9PTzHwyg4o6/tL5C6QGT+eUg== X-Received: by 2002:a05:6602:1c96:b0:7c4:5ea2:6d8b with SMTP id hf22-20020a0566021c9600b007c45ea26d8bmr12803355iob.8.1708297217748; Sun, 18 Feb 2024 15:00:17 -0800 (PST) Received: from localhost.localdomain ([2601:282:4300:19e0::44fb]) by smtp.gmail.com with ESMTPSA id n3-20020a02cc03000000b004712a778fb4sm1214848jap.28.2024.02.18.15.00.15 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Sun, 18 Feb 2024 15:00:16 -0800 (PST) From: Joshua Watt X-Google-Original-From: Joshua Watt To: bitbake-devel@lists.openembedded.org Cc: Tobias Hagelborn , Tobias Hagelborn , Richard Purdie , Joshua Watt Subject: [bitbake-devel][PATCH v2 8/8] bitbake: hashserv: Postgres adaptations for ignoring duplicate inserts Date: Sun, 18 Feb 2024 15:59:53 -0700 Message-Id: <20240218225953.2997239-9-JPEWhacker@gmail.com> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20240218225953.2997239-1-JPEWhacker@gmail.com> References: <20240218200743.2982923-1-JPEWhacker@gmail.com> <20240218225953.2997239-1-JPEWhacker@gmail.com> MIME-Version: 1.0 List-Id: X-Webhook-Received: from li982-79.members.linode.com [45.33.32.79] by aws-us-west-2-korg-lkml-1.web.codeaurora.org with HTTPS for ; Sun, 18 Feb 2024 23:00:25 -0000 X-Groupsio-URL: https://lists.openembedded.org/g/bitbake-devel/message/15930 From: Tobias Hagelborn Hash Equivalence server performs unconditional insert also of duplicate hash entries. This causes excessive error log entries in Postgres. Rather ignore the duplicate inserts. The alternate behavior should be isolated to the postgres engine type. Signed-off-by: Tobias Hagelborn Signed-off-by: Richard Purdie Signed-off-by: Joshua Watt --- bitbake/lib/hashserv/sqlalchemy.py | 53 +++++++++++++++++++++--------- 1 file changed, 38 insertions(+), 15 deletions(-) diff --git a/bitbake/lib/hashserv/sqlalchemy.py b/bitbake/lib/hashserv/sqlalchemy.py index 0e28d738f5a..fc3ae3d3396 100644 --- a/bitbake/lib/hashserv/sqlalchemy.py +++ b/bitbake/lib/hashserv/sqlalchemy.py @@ -33,6 +33,7 @@ from sqlalchemy import ( import sqlalchemy.engine from sqlalchemy.orm import declarative_base from sqlalchemy.exc import IntegrityError +from sqlalchemy.dialects.postgresql import insert as postgres_insert Base = declarative_base() @@ -283,9 +284,7 @@ class Database(object): async def unihash_exists(self, unihash): async with self.db.begin(): result = await self._execute( - select(UnihashesV3) - .where(UnihashesV3.unihash == unihash) - .limit(1) + select(UnihashesV3).where(UnihashesV3.unihash == unihash).limit(1) ) return result.first() is not None @@ -435,18 +434,30 @@ class Database(object): return result.rowcount async def insert_unihash(self, method, taskhash, unihash): - try: - async with self.db.begin(): - await self._execute( - insert(UnihashesV3).values( - method=method, - taskhash=taskhash, - unihash=unihash, - gc_mark=self._get_config_subquery("gc-mark", ""), - ) + # Postgres specific ignore on insert duplicate + if self.engine.name == "postgresql": + statement = ( + postgres_insert(UnihashesV3) + .values( + method=method, + taskhash=taskhash, + unihash=unihash, + gc_mark=self._get_config_subquery("gc-mark", ""), ) + .on_conflict_do_nothing(index_elements=("method", "taskhash")) + ) + else: + statement = insert(UnihashesV3).values( + method=method, + taskhash=taskhash, + unihash=unihash, + gc_mark=self._get_config_subquery("gc-mark", ""), + ) - return True + try: + async with self.db.begin(): + result = await self._execute(statement) + return result.rowcount != 0 except IntegrityError: self.logger.debug( "%s, %s, %s already in unihash database", method, taskhash, unihash @@ -461,10 +472,22 @@ class Database(object): if "created" in data and not isinstance(data["created"], datetime): data["created"] = datetime.fromisoformat(data["created"]) + # Postgres specific ignore on insert duplicate + if self.engine.name == "postgresql": + statement = ( + postgres_insert(OuthashesV2) + .values(**data) + .on_conflict_do_nothing( + index_elements=("method", "taskhash", "outhash") + ) + ) + else: + statement = insert(OuthashesV2).values(**data) + try: async with self.db.begin(): - await self._execute(insert(OuthashesV2).values(**data)) - return True + result = await self._execute(statement) + return result.rowcount != 0 except IntegrityError: self.logger.debug( "%s, %s already in outhash database", data["method"], data["outhash"]