From patchwork Tue Oct 3 14:52:38 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Joshua Watt X-Patchwork-Id: 31615 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from aws-us-west-2-korg-lkml-1.web.codeaurora.org (localhost.localdomain [127.0.0.1]) by smtp.lore.kernel.org (Postfix) with ESMTP id 18520E7AD63 for ; Tue, 3 Oct 2023 14:53:02 +0000 (UTC) Received: from mail-oa1-f48.google.com (mail-oa1-f48.google.com [209.85.160.48]) by mx.groups.io with SMTP id smtpd.web11.110204.1696344775314271846 for ; Tue, 03 Oct 2023 07:52:55 -0700 Authentication-Results: mx.groups.io; dkim=pass header.i=@gmail.com header.s=20230601 header.b=mSZZWuKL; spf=pass (domain: gmail.com, ip: 209.85.160.48, mailfrom: jpewhacker@gmail.com) Received: by mail-oa1-f48.google.com with SMTP id 586e51a60fabf-1dd1714b9b6so672312fac.0 for ; Tue, 03 Oct 2023 07:52:55 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20230601; t=1696344773; x=1696949573; darn=lists.openembedded.org; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=u+nDn8nL4eCA/F3d/suaKFZF/rHQhTMMCIQRlFwXB2M=; b=mSZZWuKLVar4QHYWR1UpaEBJckhoIlL0pQpEPIbN9GKu8ReDvc1rH9EvN1p/ef7uyH Dm2YTIuJ2BNZs62E1U1eQd51Ia+C9u0q7EqRamjBUCTvB76E4Q36N1QCJLA8SAnEWy/O YMQ3hbk8UWr0hXFRdQOvgAjVUcaSIKDVxnyUzeTT8SgP+w9Ju5caPZb39WbSId+Qaynd qPY0bkzKhnBvRuYDe17T2rgwGABKf0n0H/tEqsd18s/1+lIccFfHhbGg3NEd7TpxtEa6 6V1/HtSm8Obam3oV1HqEDSjDFbB18h+0YMhaHM2Z9VzhXJrVJ/33XLdg7Bzp6iFVHHLg h2jg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1696344773; x=1696949573; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-message-state:from:to:cc :subject:date:message-id:reply-to; bh=u+nDn8nL4eCA/F3d/suaKFZF/rHQhTMMCIQRlFwXB2M=; b=IcS/lv8O/LgfSp8uOJyqWWCSz22m6eyH5goytXByXAgCT2lA+btdRq+v0u0HFpEjUv 0rhLcxdMvYnyV+p4/z2HhrktySVz85FYfsRMcL+IhyGGT1qouli8BWd47g7b2Mx9tom7 YCCIh5hFmmqyucbCFfpwwaKz0tCI0tBoHH1NPkuHE9ji8mtmOadKy5Uos2Lo4rLmSZLX gJ/1jO+Me514011oPWGfTN325IgfxF3P/6xf3oEU9WLyCbFS51uujzW366TY2R2iQTT2 gQ4BU+UAPZIwpw9s0FPLA5cEZIcBEri8/rOia15qQQSKjqcDXAflbPANVEQMcLunSbCr kZAw== X-Gm-Message-State: AOJu0YxcBaTb69UclrzLYl+eqzGZhTMBSOzOMJ/LNPzxkA2TZsETXXFH UsbQAO0rosHTpTlKPRiPOb91BoCB2Ck= X-Google-Smtp-Source: AGHT+IEkHdhqaNBpD8MUIxXqyJTGvTEObt89JgTX2PGjhuvPmYhRQHcADgarAqkh+ljzCfaqOguPBA== X-Received: by 2002:a05:6870:d10a:b0:1d0:f5bd:6fe with SMTP id e10-20020a056870d10a00b001d0f5bd06femr16897779oac.50.1696344773218; Tue, 03 Oct 2023 07:52:53 -0700 (PDT) Received: from localhost.localdomain ([2601:282:4300:19e0::589]) by smtp.gmail.com with ESMTPSA id du28-20020a0568703a1c00b001dd2b869a26sm253477oab.17.2023.10.03.07.52.51 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Tue, 03 Oct 2023 07:52:52 -0700 (PDT) From: Joshua Watt X-Google-Original-From: Joshua Watt To: bitbake-devel@lists.openembedded.org Cc: Joshua Watt Subject: [bitbake-devel][RFC v2 01/12] asyncrpc: Abstract sockets Date: Tue, 3 Oct 2023 08:52:38 -0600 Message-Id: <20231003145249.1166276-2-JPEWhacker@gmail.com> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20231003145249.1166276-1-JPEWhacker@gmail.com> References: <20230928170551.4193224-1-JPEWhacker@gmail.com> <20231003145249.1166276-1-JPEWhacker@gmail.com> MIME-Version: 1.0 List-Id: X-Webhook-Received: from li982-79.members.linode.com [45.33.32.79] by aws-us-west-2-korg-lkml-1.web.codeaurora.org with HTTPS for ; Tue, 03 Oct 2023 14:53:02 -0000 X-Groupsio-URL: https://lists.openembedded.org/g/bitbake-devel/message/15162 Rewrites the asyncrpc client and server code to make it possible to have other transport backends that are not stream based (e.g. websockets which are message based). The connection handling classes are now shared between both the client and server to make it easier to implement new transport mechanisms Signed-off-by: Joshua Watt --- bitbake/lib/bb/asyncrpc/__init__.py | 32 +-- bitbake/lib/bb/asyncrpc/client.py | 78 ++----- bitbake/lib/bb/asyncrpc/connection.py | 95 ++++++++ bitbake/lib/bb/asyncrpc/exceptions.py | 17 ++ bitbake/lib/bb/asyncrpc/serv.py | 298 +++++++++++++------------- bitbake/lib/hashserv/__init__.py | 21 -- bitbake/lib/hashserv/client.py | 34 +-- bitbake/lib/hashserv/server.py | 111 ++++------ bitbake/lib/prserv/client.py | 8 +- bitbake/lib/prserv/serv.py | 31 +-- 10 files changed, 378 insertions(+), 347 deletions(-) create mode 100644 bitbake/lib/bb/asyncrpc/connection.py create mode 100644 bitbake/lib/bb/asyncrpc/exceptions.py diff --git a/bitbake/lib/bb/asyncrpc/__init__.py b/bitbake/lib/bb/asyncrpc/__init__.py index 9a85e9965b7..9f677eac4c5 100644 --- a/bitbake/lib/bb/asyncrpc/__init__.py +++ b/bitbake/lib/bb/asyncrpc/__init__.py @@ -4,30 +4,12 @@ # SPDX-License-Identifier: GPL-2.0-only # -import itertools -import json - -# The Python async server defaults to a 64K receive buffer, so we hardcode our -# maximum chunk size. It would be better if the client and server reported to -# each other what the maximum chunk sizes were, but that will slow down the -# connection setup with a round trip delay so I'd rather not do that unless it -# is necessary -DEFAULT_MAX_CHUNK = 32 * 1024 - - -def chunkify(msg, max_chunk): - if len(msg) < max_chunk - 1: - yield ''.join((msg, "\n")) - else: - yield ''.join((json.dumps({ - 'chunk-stream': None - }), "\n")) - - args = [iter(msg)] * (max_chunk - 1) - for m in map(''.join, itertools.zip_longest(*args, fillvalue='')): - yield ''.join(itertools.chain(m, "\n")) - yield "\n" - from .client import AsyncClient, Client -from .serv import AsyncServer, AsyncServerConnection, ClientError, ServerError +from .serv import AsyncServer, AsyncServerConnection +from .connection import DEFAULT_MAX_CHUNK +from .exceptions import ( + ClientError, + ServerError, + ConnectionClosedError, +) diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py index fa042bbe87c..7f33099b639 100644 --- a/bitbake/lib/bb/asyncrpc/client.py +++ b/bitbake/lib/bb/asyncrpc/client.py @@ -10,13 +10,13 @@ import json import os import socket import sys -from . import chunkify, DEFAULT_MAX_CHUNK +from .connection import StreamConnection, DEFAULT_MAX_CHUNK +from .exceptions import ConnectionClosedError class AsyncClient(object): def __init__(self, proto_name, proto_version, logger, timeout=30): - self.reader = None - self.writer = None + self.socket = None self.max_chunk = DEFAULT_MAX_CHUNK self.proto_name = proto_name self.proto_version = proto_version @@ -25,7 +25,8 @@ class AsyncClient(object): async def connect_tcp(self, address, port): async def connect_sock(): - return await asyncio.open_connection(address, port) + reader, writer = await asyncio.open_connection(address, port) + return StreamConnection(reader, writer, self.timeout, self.max_chunk) self._connect_sock = connect_sock @@ -40,27 +41,27 @@ class AsyncClient(object): sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) sock.connect(os.path.basename(path)) finally: - os.chdir(cwd) - return await asyncio.open_unix_connection(sock=sock) + os.chdir(cwd) + reader, writer = await asyncio.open_unix_connection(sock=sock) + return StreamConnection(reader, writer, self.timeout, self.max_chunk) self._connect_sock = connect_sock async def setup_connection(self): - s = '%s %s\n\n' % (self.proto_name, self.proto_version) - self.writer.write(s.encode("utf-8")) - await self.writer.drain() + # Send headers + await self.socket.send("%s %s" % (self.proto_name, self.proto_version)) + # End of headers + await self.socket.send("") async def connect(self): - if self.reader is None or self.writer is None: - (self.reader, self.writer) = await self._connect_sock() + if self.socket is None: + self.socket = await self._connect_sock() await self.setup_connection() async def close(self): - self.reader = None - - if self.writer is not None: - self.writer.close() - self.writer = None + if self.socket is not None: + await self.socket.close() + self.socket = None async def _send_wrapper(self, proc): count = 0 @@ -71,6 +72,7 @@ class AsyncClient(object): except ( OSError, ConnectionError, + ConnectionClosedError, json.JSONDecodeError, UnicodeDecodeError, ) as e: @@ -82,49 +84,15 @@ class AsyncClient(object): await self.close() count += 1 - async def send_message(self, msg): - async def get_line(): - try: - line = await asyncio.wait_for(self.reader.readline(), self.timeout) - except asyncio.TimeoutError: - raise ConnectionError("Timed out waiting for server") - - if not line: - raise ConnectionError("Connection closed") - - line = line.decode("utf-8") - - if not line.endswith("\n"): - raise ConnectionError("Bad message %r" % (line)) - - return line - + async def invoke(self, msg): async def proc(): - for c in chunkify(json.dumps(msg), self.max_chunk): - self.writer.write(c.encode("utf-8")) - await self.writer.drain() - - l = await get_line() - - m = json.loads(l) - if m and "chunk-stream" in m: - lines = [] - while True: - l = (await get_line()).rstrip("\n") - if not l: - break - lines.append(l) - - m = json.loads("".join(lines)) - - return m + await self.socket.send_message(msg) + return await self.socket.recv_message() return await self._send_wrapper(proc) async def ping(self): - return await self.send_message( - {'ping': {}} - ) + return await self.invoke({"ping": {}}) class Client(object): @@ -142,7 +110,7 @@ class Client(object): # required (but harmless) with it. asyncio.set_event_loop(self.loop) - self._add_methods('connect_tcp', 'ping') + self._add_methods("connect_tcp", "ping") @abc.abstractmethod def _get_async_client(self): diff --git a/bitbake/lib/bb/asyncrpc/connection.py b/bitbake/lib/bb/asyncrpc/connection.py new file mode 100644 index 00000000000..c4fd24754c8 --- /dev/null +++ b/bitbake/lib/bb/asyncrpc/connection.py @@ -0,0 +1,95 @@ +# +# Copyright BitBake Contributors +# +# SPDX-License-Identifier: GPL-2.0-only +# + +import asyncio +import itertools +import json +from .exceptions import ClientError, ConnectionClosedError + + +# The Python async server defaults to a 64K receive buffer, so we hardcode our +# maximum chunk size. It would be better if the client and server reported to +# each other what the maximum chunk sizes were, but that will slow down the +# connection setup with a round trip delay so I'd rather not do that unless it +# is necessary +DEFAULT_MAX_CHUNK = 32 * 1024 + + +def chunkify(msg, max_chunk): + if len(msg) < max_chunk - 1: + yield "".join((msg, "\n")) + else: + yield "".join((json.dumps({"chunk-stream": None}), "\n")) + + args = [iter(msg)] * (max_chunk - 1) + for m in map("".join, itertools.zip_longest(*args, fillvalue="")): + yield "".join(itertools.chain(m, "\n")) + yield "\n" + + +class StreamConnection(object): + def __init__(self, reader, writer, timeout, max_chunk=DEFAULT_MAX_CHUNK): + self.reader = reader + self.writer = writer + self.timeout = timeout + self.max_chunk = max_chunk + + @property + def address(self): + return self.writer.get_extra_info("peername") + + async def send_message(self, msg): + for c in chunkify(json.dumps(msg), self.max_chunk): + self.writer.write(c.encode("utf-8")) + await self.writer.drain() + + async def recv_message(self): + l = await self.recv() + + m = json.loads(l) + if not m: + return m + + if "chunk-stream" in m: + lines = [] + while True: + l = await self.recv() + if not l: + break + lines.append(l) + + m = json.loads("".join(lines)) + + return m + + async def send(self, msg): + self.writer.write(("%s\n" % msg).encode("utf-8")) + await self.writer.drain() + + async def recv(self): + if self.timeout < 0: + line = await self.reader.readline() + else: + try: + line = await asyncio.wait_for(self.reader.readline(), self.timeout) + except asyncio.TimeoutError: + raise ConnectionError("Timed out waiting for data") + + if not line: + raise ConnectionClosedError("Connection closed") + + line = line.decode("utf-8") + + if not line.endswith("\n"): + raise ConnectionError("Bad message %r" % (line)) + + return line.rstrip() + + async def close(self): + self.reader = None + if self.writer is not None: + self.writer.close() + self.writer = None diff --git a/bitbake/lib/bb/asyncrpc/exceptions.py b/bitbake/lib/bb/asyncrpc/exceptions.py new file mode 100644 index 00000000000..a8942b4f0c5 --- /dev/null +++ b/bitbake/lib/bb/asyncrpc/exceptions.py @@ -0,0 +1,17 @@ +# +# Copyright BitBake Contributors +# +# SPDX-License-Identifier: GPL-2.0-only +# + + +class ClientError(Exception): + pass + + +class ServerError(Exception): + pass + + +class ConnectionClosedError(Exception): + pass diff --git a/bitbake/lib/bb/asyncrpc/serv.py b/bitbake/lib/bb/asyncrpc/serv.py index d2de4891b80..e823a764067 100644 --- a/bitbake/lib/bb/asyncrpc/serv.py +++ b/bitbake/lib/bb/asyncrpc/serv.py @@ -12,241 +12,242 @@ import signal import socket import sys import multiprocessing -from . import chunkify, DEFAULT_MAX_CHUNK - - -class ClientError(Exception): - pass - - -class ServerError(Exception): - pass +from .connection import StreamConnection +from .exceptions import ClientError, ServerError, ConnectionClosedError class AsyncServerConnection(object): - def __init__(self, reader, writer, proto_name, logger): - self.reader = reader - self.writer = writer + def __init__(self, socket, proto_name, logger): + self.socket = socket self.proto_name = proto_name - self.max_chunk = DEFAULT_MAX_CHUNK self.handlers = { - 'chunk-stream': self.handle_chunk, - 'ping': self.handle_ping, + "ping": self.handle_ping, } self.logger = logger + async def close(self): + await self.socket.close() + async def process_requests(self): try: - self.addr = self.writer.get_extra_info('peername') - self.logger.debug('Client %r connected' % (self.addr,)) + self.logger.debug("Client %r connected" % (self.socket.address,)) # Read protocol and version - client_protocol = await self.reader.readline() + client_protocol = await self.socket.recv() if not client_protocol: return - (client_proto_name, client_proto_version) = client_protocol.decode('utf-8').rstrip().split() + (client_proto_name, client_proto_version) = client_protocol.split() if client_proto_name != self.proto_name: - self.logger.debug('Rejecting invalid protocol %s' % (self.proto_name)) + self.logger.debug("Rejecting invalid protocol %s" % (self.proto_name)) return - self.proto_version = tuple(int(v) for v in client_proto_version.split('.')) + self.proto_version = tuple(int(v) for v in client_proto_version.split(".")) if not self.validate_proto_version(): - self.logger.debug('Rejecting invalid protocol version %s' % (client_proto_version)) + self.logger.debug( + "Rejecting invalid protocol version %s" % (client_proto_version) + ) return # Read headers. Currently, no headers are implemented, so look for # an empty line to signal the end of the headers while True: - line = await self.reader.readline() - if not line: - return - - line = line.decode('utf-8').rstrip() - if not line: + header = await self.socket.recv() + if not header: break # Handle messages while True: - d = await self.read_message() + d = await self.socket.recv_message() if d is None: break - await self.dispatch_message(d) - await self.writer.drain() - except ClientError as e: + response = await self.dispatch_message(d) + await self.socket.send_message(response) + except ConnectionClosedError as e: + self.logger.info(str(e)) + except (ClientError, ConnectionError) as e: self.logger.error(str(e)) finally: - self.writer.close() + await self.close() async def dispatch_message(self, msg): for k in self.handlers.keys(): if k in msg: - self.logger.debug('Handling %s' % k) - await self.handlers[k](msg[k]) - return + self.logger.debug("Handling %s" % k) + return await self.handlers[k](msg[k]) raise ClientError("Unrecognized command %r" % msg) - def write_message(self, msg): - for c in chunkify(json.dumps(msg), self.max_chunk): - self.writer.write(c.encode('utf-8')) + async def handle_ping(self, request): + return {"alive": True} - async def read_message(self): - l = await self.reader.readline() - if not l: - return None - try: - message = l.decode('utf-8') +class StreamServer(object): + def __init__(self, handler, logger): + self.handler = handler + self.logger = logger + self.closed = False - if not message.endswith('\n'): - return None + async def handle_stream_client(self, reader, writer): + # writer.transport.set_write_buffer_limits(0) + socket = StreamConnection(reader, writer, -1) + if self.closed: + await socket.close() + return + + await self.handler(socket) + + async def stop(self): + self.closed = True + + +class TCPStreamServer(StreamServer): + def __init__(self, host, port, handler, logger): + super().__init__(handler, logger) + self.host = host + self.port = port + + def start(self, loop): + self.server = loop.run_until_complete( + asyncio.start_server(self.handle_stream_client, self.host, self.port) + ) + + for s in self.server.sockets: + self.logger.debug("Listening on %r" % (s.getsockname(),)) + # Newer python does this automatically. Do it manually here for + # maximum compatibility + s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) + s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1) + + # Enable keep alives. This prevents broken client connections + # from persisting on the server for long periods of time. + s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 30) + s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 15) + s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 4) + + name = self.server.sockets[0].getsockname() + if self.server.sockets[0].family == socket.AF_INET6: + self.address = "[%s]:%d" % (name[0], name[1]) + else: + self.address = "%s:%d" % (name[0], name[1]) + + return [self.server.wait_closed()] + + async def stop(self): + await super().stop() + self.server.close() + + def cleanup(self): + pass - return json.loads(message) - except (json.JSONDecodeError, UnicodeDecodeError) as e: - self.logger.error('Bad message from client: %r' % message) - raise e - async def handle_chunk(self, request): - lines = [] - try: - while True: - l = await self.reader.readline() - l = l.rstrip(b"\n").decode("utf-8") - if not l: - break - lines.append(l) +class UnixStreamServer(StreamServer): + def __init__(self, path, handler, logger): + super().__init__(handler, logger) + self.path = path - msg = json.loads(''.join(lines)) - except (json.JSONDecodeError, UnicodeDecodeError) as e: - self.logger.error('Bad message from client: %r' % lines) - raise e + def start(self, loop): + cwd = os.getcwd() + try: + # Work around path length limits in AF_UNIX + os.chdir(os.path.dirname(self.path)) + self.server = loop.run_until_complete( + asyncio.start_unix_server( + self.handle_stream_client, os.path.basename(self.path) + ) + ) + finally: + os.chdir(cwd) - if 'chunk-stream' in msg: - raise ClientError("Nested chunks are not allowed") + self.logger.debug("Listening on %r" % self.path) + self.address = "unix://%s" % os.path.abspath(self.path) + return [self.server.wait_closed()] - await self.dispatch_message(msg) + async def stop(self): + await super().stop() + self.server.close() - async def handle_ping(self, request): - response = {'alive': True} - self.write_message(response) + def cleanup(self): + os.unlink(self.path) class AsyncServer(object): def __init__(self, logger): - self._cleanup_socket = None self.logger = logger - self.start = None - self.address = None self.loop = None + self.run_tasks = [] def start_tcp_server(self, host, port): - def start_tcp(): - self.server = self.loop.run_until_complete( - asyncio.start_server(self.handle_client, host, port) - ) - - for s in self.server.sockets: - self.logger.debug('Listening on %r' % (s.getsockname(),)) - # Newer python does this automatically. Do it manually here for - # maximum compatibility - s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) - s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1) - - # Enable keep alives. This prevents broken client connections - # from persisting on the server for long periods of time. - s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 30) - s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 15) - s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 4) - - name = self.server.sockets[0].getsockname() - if self.server.sockets[0].family == socket.AF_INET6: - self.address = "[%s]:%d" % (name[0], name[1]) - else: - self.address = "%s:%d" % (name[0], name[1]) - - self.start = start_tcp + self.server = TCPStreamServer(host, port, self._client_handler, self.logger) def start_unix_server(self, path): - def cleanup(): - os.unlink(path) - - def start_unix(): - cwd = os.getcwd() - try: - # Work around path length limits in AF_UNIX - os.chdir(os.path.dirname(path)) - self.server = self.loop.run_until_complete( - asyncio.start_unix_server(self.handle_client, os.path.basename(path)) - ) - finally: - os.chdir(cwd) - - self.logger.debug('Listening on %r' % path) - - self._cleanup_socket = cleanup - self.address = "unix://%s" % os.path.abspath(path) - - self.start = start_unix - - @abc.abstractmethod - def accept_client(self, reader, writer): - pass + self.server = UnixStreamServer(path, self._client_handler, self.logger) - async def handle_client(self, reader, writer): - # writer.transport.set_write_buffer_limits(0) + async def _client_handler(self, socket): try: - client = self.accept_client(reader, writer) + client = self.accept_client(socket) await client.process_requests() except Exception as e: import traceback - self.logger.error('Error from client: %s' % str(e), exc_info=True) + + self.logger.error("Error from client: %s" % str(e), exc_info=True) traceback.print_exc() - writer.close() - self.logger.debug('Client disconnected') + await socket.close() + self.logger.debug("Client disconnected") - def run_loop_forever(self): - try: - self.loop.run_forever() - except KeyboardInterrupt: - pass + @abc.abstractmethod + def accept_client(self, socket): + pass + + async def stop(self): + self.logger.debug("Stopping server") + await self.server.stop() + + def start(self): + tasks = self.server.start(self.loop) + self.address = self.server.address + return tasks def signal_handler(self): self.logger.debug("Got exit signal") - self.loop.stop() + self.loop.create_task(self.stop()) - def _serve_forever(self): + def _serve_forever(self, tasks): try: self.loop.add_signal_handler(signal.SIGTERM, self.signal_handler) + self.loop.add_signal_handler(signal.SIGINT, self.signal_handler) + self.loop.add_signal_handler(signal.SIGQUIT, self.signal_handler) signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGTERM]) - self.run_loop_forever() - self.server.close() + self.loop.run_until_complete(asyncio.gather(*tasks)) - self.loop.run_until_complete(self.server.wait_closed()) - self.logger.debug('Server shutting down') + self.logger.debug("Server shutting down") finally: - if self._cleanup_socket is not None: - self._cleanup_socket() + self.server.cleanup() def serve_forever(self): """ Serve requests in the current process """ + self._create_loop() + tasks = self.start() + self._serve_forever(tasks) + self.loop.close() + + def _create_loop(self): # Create loop and override any loop that may have existed in # a parent process. It is possible that the usecases of # serve_forever might be constrained enough to allow using # get_event_loop here, but better safe than sorry for now. self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) - self.start() - self._serve_forever() def serve_as_process(self, *, prefunc=None, args=()): """ Serve requests in a child process """ + def run(queue): # Create loop and override any loop that may have existed # in a parent process. Without doing this and instead @@ -259,18 +260,19 @@ class AsyncServer(object): # more general, though, as any potential use of asyncio in # Cooker could create a loop that needs to replaced in this # new process. - self.loop = asyncio.new_event_loop() - asyncio.set_event_loop(self.loop) + self._create_loop() try: - self.start() + self.address = None + tasks = self.start() finally: + # Always put the server address to wake up the parent task queue.put(self.address) queue.close() if prefunc is not None: prefunc(self, *args) - self._serve_forever() + self._serve_forever(tasks) if sys.version_info >= (3, 6): self.loop.run_until_complete(self.loop.shutdown_asyncgens()) diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py index 9cb3fd57a51..3a4018353f2 100644 --- a/bitbake/lib/hashserv/__init__.py +++ b/bitbake/lib/hashserv/__init__.py @@ -15,13 +15,6 @@ UNIX_PREFIX = "unix://" ADDR_TYPE_UNIX = 0 ADDR_TYPE_TCP = 1 -# The Python async server defaults to a 64K receive buffer, so we hardcode our -# maximum chunk size. It would be better if the client and server reported to -# each other what the maximum chunk sizes were, but that will slow down the -# connection setup with a round trip delay so I'd rather not do that unless it -# is necessary -DEFAULT_MAX_CHUNK = 32 * 1024 - UNIHASH_TABLE_DEFINITION = ( ("method", "TEXT NOT NULL", "UNIQUE"), ("taskhash", "TEXT NOT NULL", "UNIQUE"), @@ -102,20 +95,6 @@ def parse_address(addr): return (ADDR_TYPE_TCP, (host, int(port))) -def chunkify(msg, max_chunk): - if len(msg) < max_chunk - 1: - yield ''.join((msg, "\n")) - else: - yield ''.join((json.dumps({ - 'chunk-stream': None - }), "\n")) - - args = [iter(msg)] * (max_chunk - 1) - for m in map(''.join, itertools.zip_longest(*args, fillvalue='')): - yield ''.join(itertools.chain(m, "\n")) - yield "\n" - - def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False): from . import server db = setup_database(dbname, sync=sync) diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index b2aa1026ac9..47e118c4583 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py @@ -28,22 +28,26 @@ class AsyncClient(bb.asyncrpc.AsyncClient): async def send_stream(self, msg): async def proc(): - self.writer.write(("%s\n" % msg).encode("utf-8")) - await self.writer.drain() - l = await self.reader.readline() - if not l: - raise ConnectionError("Connection closed") - return l.decode("utf-8").rstrip() + await self.socket.send(msg) + return await self.socket.recv() return await self._send_wrapper(proc) async def _set_mode(self, new_mode): + async def stream_to_normal(): + await self.socket.send("END") + return await self.socket.recv_message() + + async def normal_to_stream(): + await self.socket.send_message({"get-stream": None}) + return await self.socket.recv() + if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM: - r = await self.send_stream("END") + r = await self._send_wrapper(stream_to_normal) if r != "ok": raise ConnectionError("Bad response from server %r" % r) elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL: - r = await self.send_message({"get-stream": None}) + r = await self._send_wrapper(normal_to_stream) if r != "ok": raise ConnectionError("Bad response from server %r" % r) elif new_mode != self.mode: @@ -67,7 +71,7 @@ class AsyncClient(bb.asyncrpc.AsyncClient): m["method"] = method m["outhash"] = outhash m["unihash"] = unihash - return await self.send_message({"report": m}) + return await self.invoke({"report": m}) async def report_unihash_equiv(self, taskhash, method, unihash, extra={}): await self._set_mode(self.MODE_NORMAL) @@ -75,31 +79,31 @@ class AsyncClient(bb.asyncrpc.AsyncClient): m["taskhash"] = taskhash m["method"] = method m["unihash"] = unihash - return await self.send_message({"report-equiv": m}) + return await self.invoke({"report-equiv": m}) async def get_taskhash(self, method, taskhash, all_properties=False): await self._set_mode(self.MODE_NORMAL) - return await self.send_message( + return await self.invoke( {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} ) async def get_outhash(self, method, outhash, taskhash): await self._set_mode(self.MODE_NORMAL) - return await self.send_message( + return await self.invoke( {"get-outhash": {"outhash": outhash, "taskhash": taskhash, "method": method}} ) async def get_stats(self): await self._set_mode(self.MODE_NORMAL) - return await self.send_message({"get-stats": None}) + return await self.invoke({"get-stats": None}) async def reset_stats(self): await self._set_mode(self.MODE_NORMAL) - return await self.send_message({"reset-stats": None}) + return await self.invoke({"reset-stats": None}) async def backfill_wait(self): await self._set_mode(self.MODE_NORMAL) - return (await self.send_message({"backfill-wait": None}))["tasks"] + return (await self.invoke({"backfill-wait": None}))["tasks"] class Client(bb.asyncrpc.Client): diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index d40a2ab8f88..43eddc5d9f6 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py @@ -165,8 +165,8 @@ class ServerCursor(object): class ServerClient(bb.asyncrpc.AsyncServerConnection): - def __init__(self, reader, writer, db, request_stats, backfill_queue, upstream, read_only): - super().__init__(reader, writer, 'OEHASHEQUIV', logger) + def __init__(self, socket, db, request_stats, backfill_queue, upstream, read_only): + super().__init__(socket, 'OEHASHEQUIV', logger) self.db = db self.request_stats = request_stats self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK @@ -207,12 +207,11 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): if k in msg: logger.debug('Handling %s' % k) if 'stream' in k: - await self.handlers[k](msg[k]) + return await self.handlers[k](msg[k]) else: with self.request_stats.start_sample() as self.request_sample, \ self.request_sample.measure(): - await self.handlers[k](msg[k]) - return + return await self.handlers[k](msg[k]) raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) @@ -222,9 +221,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): fetch_all = request.get('all', False) with closing(self.db.cursor()) as cursor: - d = await self.get_unihash(cursor, method, taskhash, fetch_all) - - self.write_message(d) + return await self.get_unihash(cursor, method, taskhash, fetch_all) async def get_unihash(self, cursor, method, taskhash, fetch_all=False): d = None @@ -271,9 +268,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): taskhash = request['taskhash'] with closing(self.db.cursor()) as cursor: - d = await self.get_outhash(cursor, method, outhash, taskhash) - - self.write_message(d) + return await self.get_outhash(cursor, method, outhash, taskhash) async def get_outhash(self, cursor, method, outhash, taskhash): d = None @@ -317,14 +312,14 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): ) async def handle_get_stream(self, request): - self.write_message('ok') + await self.socket.send("ok") while True: upstream = None - l = await self.reader.readline() + l = await self.socket.recv() if not l: - return + break try: # This inner loop is very sensitive and must be as fast as @@ -335,10 +330,8 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): request_measure = self.request_sample.measure() request_measure.start() - l = l.decode('utf-8').rstrip() if l == 'END': - self.writer.write('ok\n'.encode('utf-8')) - return + break (method, taskhash) = l.split() #logger.debug('Looking up %s %s' % (method, taskhash)) @@ -349,29 +342,29 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): cursor.close() if row is not None: - msg = ('%s\n' % row['unihash']).encode('utf-8') + msg = row['unihash'] #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) elif self.upstream_client is not None: upstream = await self.upstream_client.get_unihash(method, taskhash) if upstream: - msg = ("%s\n" % upstream).encode("utf-8") + msg = upstream else: - msg = "\n".encode("utf-8") + msg = "" else: - msg = '\n'.encode('utf-8') + msg = "" - self.writer.write(msg) + await self.socket.send(msg) finally: request_measure.end() self.request_sample.end() - await self.writer.drain() - # Post to the backfill queue after writing the result to minimize # the turn around time on a request if upstream is not None: await self.backfill_queue.put((method, taskhash)) + return "ok" + async def handle_report(self, data): with closing(self.db.cursor()) as cursor: outhash_data = { @@ -451,7 +444,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): 'unihash': unihash, } - self.write_message(d) + return d async def handle_equivreport(self, data): with closing(self.db.cursor()) as cursor: @@ -474,30 +467,28 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} - self.write_message(d) + return d async def handle_get_stats(self, request): - d = { + return { 'requests': self.request_stats.todict(), } - self.write_message(d) - async def handle_reset_stats(self, request): d = { 'requests': self.request_stats.todict(), } self.request_stats.reset() - self.write_message(d) + return d async def handle_backfill_wait(self, request): d = { 'tasks': self.backfill_queue.qsize(), } await self.backfill_queue.join() - self.write_message(d) + return d def query_equivalent(self, cursor, method, taskhash): # This is part of the inner loop and must be as fast as possible @@ -522,41 +513,33 @@ class Server(bb.asyncrpc.AsyncServer): self.db = db self.upstream = upstream self.read_only = read_only + self.backfill_queue = None - def accept_client(self, reader, writer): - return ServerClient(reader, writer, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only) + def accept_client(self, socket): + return ServerClient(socket, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only) - @contextmanager - def _backfill_worker(self): - async def backfill_worker_task(): - client = await create_async_client(self.upstream) - try: - while True: - item = await self.backfill_queue.get() - if item is None: - self.backfill_queue.task_done() - break - method, taskhash = item - await copy_unihash_from_upstream(client, self.db, method, taskhash) + async def backfill_worker_task(self): + client = await create_async_client(self.upstream) + try: + while True: + item = await self.backfill_queue.get() + if item is None: self.backfill_queue.task_done() - finally: - await client.close() + break + method, taskhash = item + await copy_unihash_from_upstream(client, self.db, method, taskhash) + self.backfill_queue.task_done() + finally: + await client.close() - async def join_worker(worker): + def start(self): + tasks = super().start() + if self.upstream: + self.backfill_queue = asyncio.Queue() + tasks += [self.backfill_worker_task()] + return tasks + + async def stop(self): + if self.backfill_queue is not None: await self.backfill_queue.put(None) - await worker - - if self.upstream is not None: - worker = asyncio.ensure_future(backfill_worker_task()) - try: - yield - finally: - self.loop.run_until_complete(join_worker(worker)) - else: - yield - - def run_loop_forever(self): - self.backfill_queue = asyncio.Queue() - - with self._backfill_worker(): - super().run_loop_forever() + await super().stop() diff --git a/bitbake/lib/prserv/client.py b/bitbake/lib/prserv/client.py index 69ab7a4ac9d..6b81356fac5 100644 --- a/bitbake/lib/prserv/client.py +++ b/bitbake/lib/prserv/client.py @@ -14,28 +14,28 @@ class PRAsyncClient(bb.asyncrpc.AsyncClient): super().__init__('PRSERVICE', '1.0', logger) async def getPR(self, version, pkgarch, checksum): - response = await self.send_message( + response = await self.invoke( {'get-pr': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum}} ) if response: return response['value'] async def importone(self, version, pkgarch, checksum, value): - response = await self.send_message( + response = await self.invoke( {'import-one': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'value': value}} ) if response: return response['value'] async def export(self, version, pkgarch, checksum, colinfo): - response = await self.send_message( + response = await self.invoke( {'export': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'colinfo': colinfo}} ) if response: return (response['metainfo'], response['datainfo']) async def is_readonly(self): - response = await self.send_message( + response = await self.invoke( {'is-readonly': {}} ) if response: diff --git a/bitbake/lib/prserv/serv.py b/bitbake/lib/prserv/serv.py index c686b2065c4..ea7933164b1 100644 --- a/bitbake/lib/prserv/serv.py +++ b/bitbake/lib/prserv/serv.py @@ -20,8 +20,8 @@ PIDPREFIX = "/tmp/PRServer_%s_%s.pid" singleton = None class PRServerClient(bb.asyncrpc.AsyncServerConnection): - def __init__(self, reader, writer, table, read_only): - super().__init__(reader, writer, 'PRSERVICE', logger) + def __init__(self, socket, table, read_only): + super().__init__(socket, 'PRSERVICE', logger) self.handlers.update({ 'get-pr': self.handle_get_pr, 'import-one': self.handle_import_one, @@ -36,12 +36,12 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection): async def dispatch_message(self, msg): try: - await super().dispatch_message(msg) + return await super().dispatch_message(msg) except: self.table.sync() raise - - self.table.sync_if_dirty() + else: + self.table.sync_if_dirty() async def handle_get_pr(self, request): version = request['version'] @@ -57,7 +57,7 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection): except sqlite3.Error as exc: logger.error(str(exc)) - self.write_message(response) + return response async def handle_import_one(self, request): response = None @@ -71,7 +71,7 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection): if value is not None: response = {'value': value} - self.write_message(response) + return response async def handle_export(self, request): version = request['version'] @@ -85,12 +85,10 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection): logger.error(str(exc)) metainfo = datainfo = None - response = {'metainfo': metainfo, 'datainfo': datainfo} - self.write_message(response) + return {'metainfo': metainfo, 'datainfo': datainfo} async def handle_is_readonly(self, request): - response = {'readonly': self.read_only} - self.write_message(response) + return {'readonly': self.read_only} class PRServer(bb.asyncrpc.AsyncServer): def __init__(self, dbfile, read_only=False): @@ -99,20 +97,23 @@ class PRServer(bb.asyncrpc.AsyncServer): self.table = None self.read_only = read_only - def accept_client(self, reader, writer): - return PRServerClient(reader, writer, self.table, self.read_only) + def accept_client(self, socket): + return PRServerClient(socket, self.table, self.read_only) - def _serve_forever(self): + def start(self): + tasks = super().start() self.db = prserv.db.PRData(self.dbfile, read_only=self.read_only) self.table = self.db["PRMAIN"] logger.info("Started PRServer with DBfile: %s, Address: %s, PID: %s" % (self.dbfile, self.address, str(os.getpid()))) - super()._serve_forever() + return tasks + async def stop(self): self.table.sync_if_dirty() self.db.disconnect() + await super().stop() def signal_handler(self): super().signal_handler() From patchwork Tue Oct 3 14:52:39 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Joshua Watt X-Patchwork-Id: 31616 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from aws-us-west-2-korg-lkml-1.web.codeaurora.org (localhost.localdomain [127.0.0.1]) by smtp.lore.kernel.org (Postfix) with ESMTP id 1A30DE7AD64 for ; Tue, 3 Oct 2023 14:53:02 +0000 (UTC) Received: from mail-ot1-f54.google.com (mail-ot1-f54.google.com [209.85.210.54]) by mx.groups.io with SMTP id smtpd.web11.110205.1696344775757067197 for ; Tue, 03 Oct 2023 07:52:55 -0700 Authentication-Results: mx.groups.io; dkim=pass header.i=@gmail.com header.s=20230601 header.b=fOp5eAhr; spf=pass (domain: gmail.com, ip: 209.85.210.54, mailfrom: jpewhacker@gmail.com) Received: by mail-ot1-f54.google.com with SMTP id 46e09a7af769-6c4b9e09521so659791a34.3 for ; Tue, 03 Oct 2023 07:52:55 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20230601; t=1696344774; x=1696949574; darn=lists.openembedded.org; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=ClqoDI2mxfMuajwgDhoxUn9MOoi3nfXxSOV09GnRprw=; b=fOp5eAhrtIY9MQw1fA++IdUQRXAMl5OncDfp6NkcyLh+aNa6A0NYCZA+WY1zuCSFnH bRe/M8t6jtVtUJTVh2439ji/vb5JMFFpGzVt7wyscPe0XpY5Mwe1VXXfhkj7+hzSR4O5 uQOljsqs1W773vWRLbsRUBq1xfiIv0REMK6rZ2Lfi2j1nimi9dxPfrsaXEqSUtGG/QK7 7ZpgnUcGEYNdz9f4CV507mPu4MQMF+AiixWNJ/RY6Be5zc57qDiKzM092eWyqe0SbaPm YsTDFhBJFgholk8Qmgc29RdviVsiNM4VeqWZiPnQkUaF+mOuLOQrpSTfgHLvPoAiIXwy uahA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1696344774; x=1696949574; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-message-state:from:to:cc :subject:date:message-id:reply-to; bh=ClqoDI2mxfMuajwgDhoxUn9MOoi3nfXxSOV09GnRprw=; b=SOslbF63AwdvjZl2tHBT7NY3TCLyaX8jU2LenYgPKhkjgg0akz4deZcMZPtNNRHIUF XfLFhC0AytMuBTLvHZ3XoYN6yxMc8NbMdQ26hQ6eqLZ+OsPf5IAJdzbBcgtY/m9Hmhrv 3W+rkOKkGwCPLTqiZ2vPjFNSAvRwiIxKlIuKBnshvGYcIsEenlkhFv3G7Mk3PWsGXM0r zF9gmdAmEPvbMuGMfSgDF75Rf/wwl4X+xAhUpyLUdasBavJBiB+n6LuVLukoLahC8IOo Wo6duyNe5dJb/05BjnFbftsTGJ/25glAOQ7LHGQ3rAXqYdlRMs2BXj0E5aZI29hWQX5E lsRA== X-Gm-Message-State: AOJu0YyrxuGSdoqBvNjGTqbjY3ujjPTySxxrCZcjtXLeInkyzuRevsCM cp8wpX+YBcPUNNZ2mP1xiERi8yQdIt0= X-Google-Smtp-Source: AGHT+IHQgcbij8CShV9nDdg2+4cn15EDUVUeuSS06kKL1Zi3eSk7mrhXtxfo1W8kdR1euabw6b/6Kw== X-Received: by 2002:a05:6870:a454:b0:1dc:7e33:4b8c with SMTP id n20-20020a056870a45400b001dc7e334b8cmr17568247oal.27.1696344774334; Tue, 03 Oct 2023 07:52:54 -0700 (PDT) Received: from localhost.localdomain ([2601:282:4300:19e0::589]) by smtp.gmail.com with ESMTPSA id du28-20020a0568703a1c00b001dd2b869a26sm253477oab.17.2023.10.03.07.52.53 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Tue, 03 Oct 2023 07:52:53 -0700 (PDT) From: Joshua Watt X-Google-Original-From: Joshua Watt To: bitbake-devel@lists.openembedded.org Cc: Joshua Watt Subject: [bitbake-devel][RFC v2 02/12] hashserv: Add remove API Date: Tue, 3 Oct 2023 08:52:39 -0600 Message-Id: <20231003145249.1166276-3-JPEWhacker@gmail.com> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20231003145249.1166276-1-JPEWhacker@gmail.com> References: <20230928170551.4193224-1-JPEWhacker@gmail.com> <20231003145249.1166276-1-JPEWhacker@gmail.com> MIME-Version: 1.0 List-Id: X-Webhook-Received: from li982-79.members.linode.com [45.33.32.79] by aws-us-west-2-korg-lkml-1.web.codeaurora.org with HTTPS for ; Tue, 03 Oct 2023 14:53:02 -0000 X-Groupsio-URL: https://lists.openembedded.org/g/bitbake-devel/message/15163 Adds a `remove` API to the client and server that can be used to remove hash equivalence entries that match a particular critera Signed-off-by: Joshua Watt --- bitbake/lib/hashserv/client.py | 5 +++++ bitbake/lib/hashserv/server.py | 28 ++++++++++++++++++++++++++++ bitbake/lib/hashserv/tests.py | 33 +++++++++++++++++++++++++++++++++ 3 files changed, 66 insertions(+) diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index 47e118c4583..d11d0ca29d0 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py @@ -105,6 +105,10 @@ class AsyncClient(bb.asyncrpc.AsyncClient): await self._set_mode(self.MODE_NORMAL) return (await self.invoke({"backfill-wait": None}))["tasks"] + async def remove(self, where): + await self._set_mode(self.MODE_NORMAL) + return await self.invoke({"remove": {"where": where}}) + class Client(bb.asyncrpc.Client): def __init__(self): @@ -119,6 +123,7 @@ class Client(bb.asyncrpc.Client): "get_stats", "reset_stats", "backfill_wait", + "remove", ) def _get_async_client(self): diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index 43eddc5d9f6..35465240ae9 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py @@ -186,6 +186,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): 'report-equiv': self.handle_equivreport, 'reset-stats': self.handle_reset_stats, 'backfill-wait': self.handle_backfill_wait, + 'remove': self.handle_remove, }) def validate_proto_version(self): @@ -490,6 +491,33 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): await self.backfill_queue.join() return d + async def handle_remove(self, request): + condition = request["where"] + if not isinstance(condition, dict): + raise TypeError("Bad condition type %s" % type(condition)) + + def do_remove(columns, table_name, cursor): + nonlocal condition + where = {} + for c in columns: + if c in condition and condition[c] is not None: + where[c] = condition[c] + + if where: + query = ('DELETE FROM %s WHERE ' % table_name) + ' AND '.join("%s=:%s" % (k, k) for k in where.keys()) + cursor.execute(query, where) + return cursor.rowcount + + return 0 + + count = 0 + with closing(self.db.cursor()) as cursor: + count += do_remove(OUTHASH_TABLE_COLUMNS, "outhashes_v2", cursor) + count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor) + self.db.commit() + + return {"count": count} + def query_equivalent(self, cursor, method, taskhash): # This is part of the inner loop and must be as fast as possible cursor.execute( diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index f6b85aed85a..a3e066406e3 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py @@ -84,6 +84,7 @@ class HashEquivalenceCommonTests(object): result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') + return taskhash, outhash, unihash def test_create_equivalent(self): # Tests that a second reported task with the same outhash will be @@ -125,6 +126,38 @@ class HashEquivalenceCommonTests(object): self.assertClientGetHash(self.client, taskhash, unihash) + def test_remove_taskhash(self): + taskhash, outhash, unihash = self.test_create_hash() + result = self.client.remove({"taskhash": taskhash}) + self.assertGreater(result["count"], 0) + self.assertClientGetHash(self.client, taskhash, None) + + result_outhash = self.client.get_outhash(self.METHOD, outhash, taskhash) + self.assertIsNone(result_outhash) + + def test_remove_unihash(self): + taskhash, outhash, unihash = self.test_create_hash() + result = self.client.remove({"unihash": unihash}) + self.assertGreater(result["count"], 0) + self.assertClientGetHash(self.client, taskhash, None) + + def test_remove_outhash(self): + taskhash, outhash, unihash = self.test_create_hash() + result = self.client.remove({"outhash": outhash}) + self.assertGreater(result["count"], 0) + + result_outhash = self.client.get_outhash(self.METHOD, outhash, taskhash) + self.assertIsNone(result_outhash) + + def test_remove_method(self): + taskhash, outhash, unihash = self.test_create_hash() + result = self.client.remove({"method": self.METHOD}) + self.assertGreater(result["count"], 0) + self.assertClientGetHash(self.client, taskhash, None) + + result_outhash = self.client.get_outhash(self.METHOD, outhash, taskhash) + self.assertIsNone(result_outhash) + def test_huge_message(self): # Simple test that hashes can be created taskhash = 'c665584ee6817aa99edfc77a44dd853828279370' From patchwork Tue Oct 3 14:52:40 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Joshua Watt X-Patchwork-Id: 31614 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from aws-us-west-2-korg-lkml-1.web.codeaurora.org (localhost.localdomain [127.0.0.1]) by smtp.lore.kernel.org (Postfix) with ESMTP id 2C0C8E7AD68 for ; Tue, 3 Oct 2023 14:53:02 +0000 (UTC) Received: from mail-oa1-f42.google.com (mail-oa1-f42.google.com [209.85.160.42]) by mx.groups.io with SMTP id smtpd.web10.110643.1696344776954788392 for ; Tue, 03 Oct 2023 07:52:57 -0700 Authentication-Results: mx.groups.io; dkim=pass header.i=@gmail.com header.s=20230601 header.b=feKMBj+X; spf=pass (domain: gmail.com, ip: 209.85.160.42, mailfrom: jpewhacker@gmail.com) Received: by mail-oa1-f42.google.com with SMTP id 586e51a60fabf-1dd830ed844so662629fac.2 for ; Tue, 03 Oct 2023 07:52:56 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20230601; t=1696344775; x=1696949575; darn=lists.openembedded.org; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=7XY9kKr2eBN8L3/KZiT0Y8ZACyU8zjbHqSJZ2t+dd+o=; b=feKMBj+X5PRkKjbpVeqL86hS6p3BdGFXpylVveDAhO7Owj7IgIxcm3mdQprFzvMAnJ rxW9IDZgmAQv8pDO3QTzT5Ys3Xbgz1sRIcC1jMoRLK2LJvP9rngF06LIwsKvyWIP7ALf qOHTHZdfkZbYRrOg8WCaTIRw8jyBrj6AV5tghstT58VqIxvAI3JYhCaxV0aUs70HWAlJ CkArduSw2Kn8i8fpD8S6E+ElhAJiC4ed9ubberKDHM/XaqszUBz7auRstGvYYolLrMy6 s1Ysk4SMCuHMGXNonSptvceaKj+1zZxsQUfZw4818po6yARYaHmdh7a6pwqV9TCffz4k WyCQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1696344775; x=1696949575; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-message-state:from:to:cc :subject:date:message-id:reply-to; bh=7XY9kKr2eBN8L3/KZiT0Y8ZACyU8zjbHqSJZ2t+dd+o=; b=mmGkCcDDP6JzciGHVE8NapM1C5WHI6PaKrRJg/kRp2+Ehlx9xv1tZFbqt6cCfRlCfp t09oIk1yQnhUs9tmg45/PQkh7Im3YYMW5niJp5ujREHisYZSwOfNogqZPs+KDdCVBZBX w/KLfHvuvj4sPsVkwr263cwqolr8tVIVvVZTzUKCAbQKPLrSEZfvMrNw1Y1Ymkn4IBFO 5bcdWBXO25Q9s+UdLMezAh5Iz7iA3iecSLxNaSQd1DrLHZSYZYrRA6lj7PMXGWR8WIGV BUEZuzuCQkbS/PriKQOg5SvTR0GfNxqy7wmJd0Fl6llkFZr9hwa3E6xry6Xh2+5S7itD tbsQ== X-Gm-Message-State: AOJu0YwOBNtnMmjnKof/PNLvtqy/nX2uAdDDDwsFRQv39ytCDPrxpaCC E0hiFDznHd5U4A9WG173vBIhX4xNrq4= X-Google-Smtp-Source: AGHT+IHIPUK2mjcGoSFJUiSNi3cGdDkdQ0FKSDy5+wM4T/lILN2QTHczNQg2SuqE6HH5Y3j5UNAjzw== X-Received: by 2002:a05:6870:8a1e:b0:1db:70ee:efed with SMTP id p30-20020a0568708a1e00b001db70eeefedmr17031207oaq.18.1696344775685; Tue, 03 Oct 2023 07:52:55 -0700 (PDT) Received: from localhost.localdomain ([2601:282:4300:19e0::589]) by smtp.gmail.com with ESMTPSA id du28-20020a0568703a1c00b001dd2b869a26sm253477oab.17.2023.10.03.07.52.54 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Tue, 03 Oct 2023 07:52:54 -0700 (PDT) From: Joshua Watt X-Google-Original-From: Joshua Watt To: bitbake-devel@lists.openembedded.org Cc: Joshua Watt Subject: [bitbake-devel][RFC v2 03/12] bitbake-hashclient: Add remove subcommand Date: Tue, 3 Oct 2023 08:52:40 -0600 Message-Id: <20231003145249.1166276-4-JPEWhacker@gmail.com> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20231003145249.1166276-1-JPEWhacker@gmail.com> References: <20230928170551.4193224-1-JPEWhacker@gmail.com> <20231003145249.1166276-1-JPEWhacker@gmail.com> MIME-Version: 1.0 List-Id: X-Webhook-Received: from li982-79.members.linode.com [45.33.32.79] by aws-us-west-2-korg-lkml-1.web.codeaurora.org with HTTPS for ; Tue, 03 Oct 2023 14:53:02 -0000 X-Groupsio-URL: https://lists.openembedded.org/g/bitbake-devel/message/15164 Adds a subcommand to invoke the remove API on the server [YOCTO #15064] Signed-off-by: Joshua Watt --- bitbake/bin/bitbake-hashclient | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/bitbake/bin/bitbake-hashclient b/bitbake/bin/bitbake-hashclient index 494f17592ac..d09104336ab 100755 --- a/bitbake/bin/bitbake-hashclient +++ b/bitbake/bin/bitbake-hashclient @@ -113,6 +113,14 @@ def main(): with lock: pbar.update() + def handle_remove(args, client): + where = {k: v for k, v in args.where} + if where: + result = client.remove(where) + print("Removed %d row(s)" % (result["count"])) + else: + print("No query specified") + parser = argparse.ArgumentParser(description='Hash Equivalence Client') parser.add_argument('--address', default=DEFAULT_ADDRESS, help='Server address (default "%(default)s")') parser.add_argument('--log', default='WARNING', help='Set logging level') @@ -137,6 +145,11 @@ def main(): help='Include string in outhash') stress_parser.set_defaults(func=handle_stress) + remove_parser = subparsers.add_parser('remove', help="Remove hash entries") + remove_parser.add_argument("--where", "-w", metavar="KEY VALUE", nargs=2, action="append", default=[], + help="Remove entries from table where KEY == VALUE") + remove_parser.set_defaults(func=handle_remove) + args = parser.parse_args() logger = logging.getLogger('hashserv') From patchwork Tue Oct 3 14:52:41 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Joshua Watt X-Patchwork-Id: 31617 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from aws-us-west-2-korg-lkml-1.web.codeaurora.org (localhost.localdomain [127.0.0.1]) by smtp.lore.kernel.org (Postfix) with ESMTP id 2C7C2E7AD69 for ; Tue, 3 Oct 2023 14:53:02 +0000 (UTC) Received: from mail-oa1-f50.google.com (mail-oa1-f50.google.com [209.85.160.50]) by mx.groups.io with SMTP id smtpd.web10.110645.1696344778244043009 for ; Tue, 03 Oct 2023 07:52:58 -0700 Authentication-Results: mx.groups.io; dkim=pass header.i=@gmail.com header.s=20230601 header.b=fT3hysjn; spf=pass (domain: gmail.com, ip: 209.85.160.50, mailfrom: jpewhacker@gmail.com) Received: by mail-oa1-f50.google.com with SMTP id 586e51a60fabf-1dd1db54d42so654891fac.3 for ; Tue, 03 Oct 2023 07:52:58 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20230601; t=1696344777; x=1696949577; darn=lists.openembedded.org; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=ogzgQHCdWX/1lzKRWqRhwcqoambb25mdwnksnUoPMXY=; b=fT3hysjn+DiIjzXnFRrrCV9YsnVnut6vsJz6IVfeUv7eTrZ7dATSZbQHt6QwirYddN WpM6Q+DIe64fvHifEj9nmyy0EJP7dF69Dv6QkLF8vXhzCPPJdPRyZQfxAz7biR34fK76 hMWXarymXVdcLZ9szRaugvs5tjHETKtimTqHGExKUHp0CXpwXTOrj8+u3uYaM6TZj0Bm ysQgYODxO++EnxkZP5GnZ4fdX+rRZi6gD3CQPjlbe8tunvj85YClQrPyGQsNtPWtH1mK 94w82VATqzlyfTxBQJYDAu+LZGhqf/WSlZ8DFZwDcFT/AA/cwUJE07slE5SG6LWVtP3m 57DQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1696344777; x=1696949577; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-message-state:from:to:cc :subject:date:message-id:reply-to; bh=ogzgQHCdWX/1lzKRWqRhwcqoambb25mdwnksnUoPMXY=; b=csO/APRz4appF9KZ8bgHmNwhXhc92Aoxt+yNu1QotBEVckholwkcMqsT+i7NWPsIR8 FRbBK0KIe6wBGjoIdsoWmyyJS5JkqE5eJ+gIgDMrChJBt0fjDbibnva+QGEBNBgsaJZn wCNy7lT5cMZ2RuOy4HJIxOJD8nWhzZIpCRqB4SvynZh4fKBL6I2SVnfWKADQob8XqN8i plExFgnpMl1bSjB0LwQdDulVzkKt8yjanoDvseDrscbyW8dMBaGQ8GltgAK0/zygf3e9 6x58T1SySAoM1p8hEoXNzLbI7vPCI20OqNffRUPj4ygal/0L4Ji0pWGEP8CskdtZyGwD oV/w== X-Gm-Message-State: AOJu0Yzw9RkcDkjJKlpkT9CnhNI/xI1fv74WC/ClyAjysw4pBwas/eqg BG0XL/tHqtkwxJuVA7y7DceGoTJkfXs= X-Google-Smtp-Source: AGHT+IH9NRQGdgZk4pnkqFbURiB02A1EkvvjvtfXjDDqBvw1ZFuPQOuJrCNYF42HArezJdEtA4AQDA== X-Received: by 2002:a05:6870:4206:b0:1c0:131b:2bb8 with SMTP id u6-20020a056870420600b001c0131b2bb8mr15833942oac.58.1696344776942; Tue, 03 Oct 2023 07:52:56 -0700 (PDT) Received: from localhost.localdomain ([2601:282:4300:19e0::589]) by smtp.gmail.com with ESMTPSA id du28-20020a0568703a1c00b001dd2b869a26sm253477oab.17.2023.10.03.07.52.55 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Tue, 03 Oct 2023 07:52:56 -0700 (PDT) From: Joshua Watt X-Google-Original-From: Joshua Watt To: bitbake-devel@lists.openembedded.org Cc: Joshua Watt Subject: [bitbake-devel][RFC v2 04/12] hashserv: Add websocket connection implementation Date: Tue, 3 Oct 2023 08:52:41 -0600 Message-Id: <20231003145249.1166276-5-JPEWhacker@gmail.com> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20231003145249.1166276-1-JPEWhacker@gmail.com> References: <20230928170551.4193224-1-JPEWhacker@gmail.com> <20231003145249.1166276-1-JPEWhacker@gmail.com> MIME-Version: 1.0 List-Id: X-Webhook-Received: from li982-79.members.linode.com [45.33.32.79] by aws-us-west-2-korg-lkml-1.web.codeaurora.org with HTTPS for ; Tue, 03 Oct 2023 14:53:02 -0000 X-Groupsio-URL: https://lists.openembedded.org/g/bitbake-devel/message/15165 Adds support to the hash equivalence client and server to communicate over websockets. Since websockets are message orientated instead of stream orientated, and new connection class is needed to handle them. Note that websocket support does require the 3rd party websockets python module be installed on the host, but it should not be required unless websockets are actually being used. Signed-off-by: Joshua Watt --- bitbake/lib/bb/asyncrpc/client.py | 11 +++++- bitbake/lib/bb/asyncrpc/connection.py | 44 ++++++++++++++++++++++ bitbake/lib/bb/asyncrpc/serv.py | 53 ++++++++++++++++++++++++++- bitbake/lib/hashserv/__init__.py | 13 +++++++ bitbake/lib/hashserv/client.py | 1 + bitbake/lib/hashserv/tests.py | 17 +++++++++ 6 files changed, 137 insertions(+), 2 deletions(-) diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py index 7f33099b639..802c07df1f4 100644 --- a/bitbake/lib/bb/asyncrpc/client.py +++ b/bitbake/lib/bb/asyncrpc/client.py @@ -10,7 +10,7 @@ import json import os import socket import sys -from .connection import StreamConnection, DEFAULT_MAX_CHUNK +from .connection import StreamConnection, WebsocketConnection, DEFAULT_MAX_CHUNK from .exceptions import ConnectionClosedError @@ -47,6 +47,15 @@ class AsyncClient(object): self._connect_sock = connect_sock + async def connect_websocket(self, uri): + import websockets + + async def connect_sock(): + websocket = await websockets.connect(uri, ping_interval=None) + return WebsocketConnection(websocket, self.timeout) + + self._connect_sock = connect_sock + async def setup_connection(self): # Send headers await self.socket.send("%s %s" % (self.proto_name, self.proto_version)) diff --git a/bitbake/lib/bb/asyncrpc/connection.py b/bitbake/lib/bb/asyncrpc/connection.py index c4fd24754c8..a10628f75a6 100644 --- a/bitbake/lib/bb/asyncrpc/connection.py +++ b/bitbake/lib/bb/asyncrpc/connection.py @@ -93,3 +93,47 @@ class StreamConnection(object): if self.writer is not None: self.writer.close() self.writer = None + + +class WebsocketConnection(object): + def __init__(self, socket, timeout): + self.socket = socket + self.timeout = timeout + + @property + def address(self): + return ":".join(str(s) for s in self.socket.remote_address) + + async def send_message(self, msg): + await self.send(json.dumps(msg)) + + async def recv_message(self): + m = await self.recv() + return json.loads(m) + + async def send(self, msg): + import websockets.exceptions + + try: + await self.socket.send(msg) + except websockets.exceptions.ConnectionClosed: + raise ConnectionClosedError("Connection closed") + + async def recv(self): + import websockets.exceptions + + try: + if self.timeout < 0: + return await self.socket.recv() + + try: + return await asyncio.wait_for(self.socket.recv(), self.timeout) + except asyncio.TimeoutError: + raise ConnectionError("Timed out waiting for data") + except websockets.exceptions.ConnectionClosed: + raise ConnectionClosedError("Connection closed") + + async def close(self): + if self.socket is not None: + await self.socket.close() + self.socket = None diff --git a/bitbake/lib/bb/asyncrpc/serv.py b/bitbake/lib/bb/asyncrpc/serv.py index e823a764067..e2589cb0990 100644 --- a/bitbake/lib/bb/asyncrpc/serv.py +++ b/bitbake/lib/bb/asyncrpc/serv.py @@ -12,7 +12,7 @@ import signal import socket import sys import multiprocessing -from .connection import StreamConnection +from .connection import StreamConnection, WebsocketConnection from .exceptions import ClientError, ServerError, ConnectionClosedError @@ -172,6 +172,54 @@ class UnixStreamServer(StreamServer): os.unlink(self.path) +class WebsocketsServer(object): + def __init__(self, host, port, handler, logger): + self.host = host + self.port = port + self.handler = handler + self.logger = logger + + def start(self, loop): + import websockets.server + + self.server = loop.run_until_complete( + websockets.server.serve( + self.client_handler, + self.host, + self.port, + ping_interval=None, + ) + ) + + for s in self.server.sockets: + self.logger.debug("Listening on %r" % (s.getsockname(),)) + + # Enable keep alives. This prevents broken client connections + # from persisting on the server for long periods of time. + s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 30) + s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 15) + s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 4) + + name = self.server.sockets[0].getsockname() + if self.server.sockets[0].family == socket.AF_INET6: + self.address = "ws://[%s]:%d" % (name[0], name[1]) + else: + self.address = "ws://%s:%d" % (name[0], name[1]) + + return [self.server.wait_closed()] + + async def stop(self): + self.server.close() + + def cleanup(self): + pass + + async def client_handler(self, websocket): + socket = WebsocketConnection(websocket, -1) + await self.handler(socket) + + class AsyncServer(object): def __init__(self, logger): self.logger = logger @@ -184,6 +232,9 @@ class AsyncServer(object): def start_unix_server(self, path): self.server = UnixStreamServer(path, self._client_handler, self.logger) + def start_websocket_server(self, host, port): + self.server = WebsocketsServer(host, port, self._client_handler, self.logger) + async def _client_handler(self, socket): try: client = self.accept_client(socket) diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py index 3a4018353f2..56b9c6bc829 100644 --- a/bitbake/lib/hashserv/__init__.py +++ b/bitbake/lib/hashserv/__init__.py @@ -9,11 +9,15 @@ import re import sqlite3 import itertools import json +from urllib.parse import urlparse UNIX_PREFIX = "unix://" +WS_PREFIX = "ws://" +WSS_PREFIX = "wss://" ADDR_TYPE_UNIX = 0 ADDR_TYPE_TCP = 1 +ADDR_TYPE_WS = 2 UNIHASH_TABLE_DEFINITION = ( ("method", "TEXT NOT NULL", "UNIQUE"), @@ -84,6 +88,8 @@ def setup_database(database, sync=True): def parse_address(addr): if addr.startswith(UNIX_PREFIX): return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX):],)) + elif addr.startswith(WS_PREFIX) or addr.startswith(WSS_PREFIX): + return (ADDR_TYPE_WS, (addr,)) else: m = re.match(r'\[(?P[^\]]*)\]:(?P\d+)$', addr) if m is not None: @@ -103,6 +109,9 @@ def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False): (typ, a) = parse_address(addr) if typ == ADDR_TYPE_UNIX: s.start_unix_server(*a) + elif typ == ADDR_TYPE_WS: + url = urlparse(a[0]) + s.start_websocket_server(url.hostname, url.port) else: s.start_tcp_server(*a) @@ -116,6 +125,8 @@ def create_client(addr): (typ, a) = parse_address(addr) if typ == ADDR_TYPE_UNIX: c.connect_unix(*a) + elif typ == ADDR_TYPE_WS: + c.connect_websocket(*a) else: c.connect_tcp(*a) @@ -128,6 +139,8 @@ async def create_async_client(addr): (typ, a) = parse_address(addr) if typ == ADDR_TYPE_UNIX: await c.connect_unix(*a) + elif typ == ADDR_TYPE_WS: + await c.connect_websocket(*a) else: await c.connect_tcp(*a) diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index d11d0ca29d0..95c0692353a 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py @@ -115,6 +115,7 @@ class Client(bb.asyncrpc.Client): super().__init__() self._add_methods( "connect_tcp", + "connect_websocket", "get_unihash", "report_unihash", "report_unihash_equiv", diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index a3e066406e3..a2827e6a301 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py @@ -464,3 +464,20 @@ class TestHashEquivalenceTCPServer(HashEquivalenceTestSetup, HashEquivalenceComm # If IPv6 is enabled, it should be safe to use localhost directly, in general # case it is more reliable to resolve the IP address explicitly. return socket.gethostbyname("localhost") + ":0" + + +class TestHashEquivalenceWebsocketServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase): + def setUp(self): + try: + import websockets + except ImportError as e: + self.skipTest(str(e)) + + super().setUp() + + def get_server_addr(self, server_idx): + # Some hosts cause asyncio module to misbehave, when IPv6 is not enabled. + # If IPv6 is enabled, it should be safe to use localhost directly, in general + # case it is more reliable to resolve the IP address explicitly. + host = socket.gethostbyname("localhost") + return "ws://%s:0" % host From patchwork Tue Oct 3 14:52:42 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Joshua Watt X-Patchwork-Id: 31618 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from aws-us-west-2-korg-lkml-1.web.codeaurora.org (localhost.localdomain [127.0.0.1]) by smtp.lore.kernel.org (Postfix) with ESMTP id 3AB5DE7AD6B for ; Tue, 3 Oct 2023 14:53:02 +0000 (UTC) Received: from mail-oa1-f50.google.com (mail-oa1-f50.google.com [209.85.160.50]) by mx.groups.io with SMTP id smtpd.web11.110206.1696344779806220502 for ; Tue, 03 Oct 2023 07:52:59 -0700 Authentication-Results: mx.groups.io; dkim=pass header.i=@gmail.com header.s=20230601 header.b=a9jZrqDU; spf=pass (domain: gmail.com, ip: 209.85.160.50, mailfrom: jpewhacker@gmail.com) Received: by mail-oa1-f50.google.com with SMTP id 586e51a60fabf-1dd1db54d42so654909fac.3 for ; Tue, 03 Oct 2023 07:52:59 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20230601; t=1696344778; x=1696949578; darn=lists.openembedded.org; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=Ah7T5DLLgKcwbR+nAz+6ZVzD9pqliJvXfCYEytYbfGQ=; b=a9jZrqDUT7xBHZioFMxBQHJHj6wBpyb/oqYoR6CWwOMFd4QPfkX/FVQEjd2bI0CxQ7 MaksIo8fRAKsPkpoilsa4boNdd93qczrgKxXdWyxmyg6LmFYLZ0jC+ZpjMWb1neEEMo6 xYXfjSm/30jn6Qq+bTWjg1WPvMOkWgyLBH9x7ctsc091EO29k+2Y8SVmEO1S+eTRi1pI TaNd9bz9E+dywLL6Vm06EVpdd8ywrlebZpUgUKlrZIU7HZlQu1weRkiRCwM2Wx1V4bLt toMzfgJ+EV7zfy5K3LC1xZ69/7xiCbz58nzAen5Xr6APUYmG9cSWCJ1HUKmocIuGvqJS qdaw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1696344778; x=1696949578; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-message-state:from:to:cc :subject:date:message-id:reply-to; bh=Ah7T5DLLgKcwbR+nAz+6ZVzD9pqliJvXfCYEytYbfGQ=; b=LmqU+v3vU+Kyr2K1rDvka8lMeL16uAT9KC9T/NhhboHMs46y875b2adtIZkLHObsqa wyFyEnBFMzw3GJU+6PdfDTI3n8u/gW6ayfNrtHJ9+u3mfeTMHahfUrG8dDC8rOkzyaUw Nr30LoT6tJft/U9zLUoxt4KV19t0zqv8+WutDPkowpX6ki0TgriG+cjeL5kaNDSZxg6A fEi0vqA295b06PCXwoli00K4ybyjf7bnBuibUv2FX6T4Cyj+ukfAUGq5/I/kB8fYkaHI W+tUk4LG8mXDPWFs1RjnPljb6tsb0x8RKaLYrZthMEKka3rAVhIdXfYA5bcu3LDjrOtR yXFg== X-Gm-Message-State: AOJu0YwbabssjCLijP6T40GnUQCVY6pqM0jKIh5DNcjVdwOwoqQwkO7h f/DkSMt4iiCrH9ChVCt0TLKc3JxApYQ= X-Google-Smtp-Source: AGHT+IFpvww6Gk+VAQS4zCWKMJSHAIcvdFKjduryLP0xY31gmAkLPJAsNaZLQ9+WC6CBPKIW8zxz9Q== X-Received: by 2002:a05:6870:d38c:b0:1bb:c50d:7451 with SMTP id k12-20020a056870d38c00b001bbc50d7451mr17171122oag.46.1696344778348; Tue, 03 Oct 2023 07:52:58 -0700 (PDT) Received: from localhost.localdomain ([2601:282:4300:19e0::589]) by smtp.gmail.com with ESMTPSA id du28-20020a0568703a1c00b001dd2b869a26sm253477oab.17.2023.10.03.07.52.57 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Tue, 03 Oct 2023 07:52:57 -0700 (PDT) From: Joshua Watt X-Google-Original-From: Joshua Watt To: bitbake-devel@lists.openembedded.org Cc: Joshua Watt Subject: [bitbake-devel][RFC v2 05/12] asyncrpc: Add context manager API Date: Tue, 3 Oct 2023 08:52:42 -0600 Message-Id: <20231003145249.1166276-6-JPEWhacker@gmail.com> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20231003145249.1166276-1-JPEWhacker@gmail.com> References: <20230928170551.4193224-1-JPEWhacker@gmail.com> <20231003145249.1166276-1-JPEWhacker@gmail.com> MIME-Version: 1.0 List-Id: X-Webhook-Received: from li982-79.members.linode.com [45.33.32.79] by aws-us-west-2-korg-lkml-1.web.codeaurora.org with HTTPS for ; Tue, 03 Oct 2023 14:53:02 -0000 X-Groupsio-URL: https://lists.openembedded.org/g/bitbake-devel/message/15166 Adds context manager API for the asyncrcp client class which allow writing code that will automatically close the connection like so: with hashserv.create_client(address) as client: ... Rework the bitbake-hashclient tool and PR server to use this new API to fix warnings about unclosed event loops when exiting Signed-off-by: Joshua Watt --- bitbake/bin/bitbake-hashclient | 36 +++++++++++++++---------------- bitbake/lib/bb/asyncrpc/client.py | 13 +++++++++++ bitbake/lib/prserv/serv.py | 6 +++--- 3 files changed, 33 insertions(+), 22 deletions(-) diff --git a/bitbake/bin/bitbake-hashclient b/bitbake/bin/bitbake-hashclient index d09104336ab..00f100b5de1 100755 --- a/bitbake/bin/bitbake-hashclient +++ b/bitbake/bin/bitbake-hashclient @@ -56,25 +56,24 @@ def main(): nonlocal missed_hashes nonlocal max_time - client = hashserv.create_client(args.address) - - for i in range(args.requests): - taskhash = hashlib.sha256() - taskhash.update(args.taskhash_seed.encode('utf-8')) - taskhash.update(str(i).encode('utf-8')) + with hashserv.create_client(args.address) as client: + for i in range(args.requests): + taskhash = hashlib.sha256() + taskhash.update(args.taskhash_seed.encode('utf-8')) + taskhash.update(str(i).encode('utf-8')) - start_time = time.perf_counter() - l = client.get_unihash(METHOD, taskhash.hexdigest()) - elapsed = time.perf_counter() - start_time + start_time = time.perf_counter() + l = client.get_unihash(METHOD, taskhash.hexdigest()) + elapsed = time.perf_counter() - start_time - with lock: - if l: - found_hashes += 1 - else: - missed_hashes += 1 + with lock: + if l: + found_hashes += 1 + else: + missed_hashes += 1 - max_time = max(elapsed, max_time) - pbar.update() + max_time = max(elapsed, max_time) + pbar.update() max_time = 0 found_hashes = 0 @@ -165,9 +164,8 @@ def main(): func = getattr(args, 'func', None) if func: - client = hashserv.create_client(args.address) - - return func(args, client) + with hashserv.create_client(args.address) as client: + return func(args, client) return 0 diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py index 802c07df1f4..009085c306a 100644 --- a/bitbake/lib/bb/asyncrpc/client.py +++ b/bitbake/lib/bb/asyncrpc/client.py @@ -103,6 +103,12 @@ class AsyncClient(object): async def ping(self): return await self.invoke({"ping": {}}) + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + await self.close() + class Client(object): def __init__(self): @@ -153,3 +159,10 @@ class Client(object): if sys.version_info >= (3, 6): self.loop.run_until_complete(self.loop.shutdown_asyncgens()) self.loop.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + return False diff --git a/bitbake/lib/prserv/serv.py b/bitbake/lib/prserv/serv.py index ea7933164b1..6168eb183d7 100644 --- a/bitbake/lib/prserv/serv.py +++ b/bitbake/lib/prserv/serv.py @@ -345,9 +345,9 @@ def auto_shutdown(): def ping(host, port): from . import client - conn = client.PRClient() - conn.connect_tcp(host, port) - return conn.ping() + with client.PRClient() as conn: + conn.connect_tcp(host, port) + return conn.ping() def connect(host, port): from . import client From patchwork Tue Oct 3 14:52:43 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Joshua Watt X-Patchwork-Id: 31619 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from aws-us-west-2-korg-lkml-1.web.codeaurora.org (localhost.localdomain [127.0.0.1]) by smtp.lore.kernel.org (Postfix) with ESMTP id 41219E7AD6A for ; Tue, 3 Oct 2023 14:53:02 +0000 (UTC) Received: from mail-oa1-f45.google.com (mail-oa1-f45.google.com [209.85.160.45]) by mx.groups.io with SMTP id smtpd.web11.110207.1696344781211647838 for ; Tue, 03 Oct 2023 07:53:01 -0700 Authentication-Results: mx.groups.io; dkim=pass header.i=@gmail.com header.s=20230601 header.b=H1C+HA+J; spf=pass (domain: gmail.com, ip: 209.85.160.45, mailfrom: jpewhacker@gmail.com) Received: by mail-oa1-f45.google.com with SMTP id 586e51a60fabf-1dcbd8a56d6so640138fac.1 for ; Tue, 03 Oct 2023 07:53:01 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20230601; t=1696344780; x=1696949580; darn=lists.openembedded.org; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=MHOfWRNE725wx1i4dygib27SrIA8Wc1jTdYSYHf2HH0=; b=H1C+HA+Jy6/XTK1ErzZNO6LW7Ljswz76uNVlu/TvJYncccMxCstJ8U1+yhJET9PNXy hX6kDqgidKfaYmhb/O8MS1jQ0Za81wSqfOYgHW19CPHpPP1SYjdmaIMKN4f/3RsPjVvP +yiRBS+KiWdqpr6EqAy/MAuD1irNxZixaqaekAMYHN1B9z5rijn0hMgI2dA33SEYjkLI Nk0f1l/oTutKJToc3FqAtnhSntBWf6ZNvFal470sP6kUe4ntq7ZCED7lwpHpRMJmDFzN mJTE91p6o0qR0V/fCg8CFFI9kRvzJGwn+Ii1o1JqQguZiFRiVTFXPlREsQx8Gm6iiae0 3jaw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1696344780; x=1696949580; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-message-state:from:to:cc :subject:date:message-id:reply-to; bh=MHOfWRNE725wx1i4dygib27SrIA8Wc1jTdYSYHf2HH0=; b=PlrbJJyAjQq/Ru5h7lj/wEAySr1spfJfybQUwosYBvAHmipf2DtOnOmufFDD5xO/3g fMe7f2iMgH4Hb7ju03vDylj20SSxDDjl14WRYOh8+I8Ky3z9Kcw+5jS6EFX+ealDUqI+ uWvBOCIyb5qXY1r4Eu2rjVK3V1SkZUB39Ojo2Al/SDTVQ1efLFBb8UyVkjYv3pbmheVt HKP8onP7x+qVMROzmnPoykQ596tz/T/h4UyF9BCCPduJAq22g67Al02l41AviukUQsdn uWVZMFcSXoudAEWODxNE67GDb9Y2hiLzvL/KdBqXh9iaJO+vCznT6npgPIH3NP1fM1rK YYoA== X-Gm-Message-State: AOJu0Yw00tfiPrMJ+Dvz793Gr9Fx8btArtQZIjnvdmudJEpUZ7yfEykx MZbVOVRKtu35alN6HoaKhUf95tGdKDE= X-Google-Smtp-Source: AGHT+IFZiLR1wJnDRUb0JgtZlQtj9C1ylR8Rgl/ZcP3FbmRBjhNAz1EhFSLULC74OcOblgW05MmmGQ== X-Received: by 2002:a05:6870:4202:b0:1be:f46d:a26c with SMTP id u2-20020a056870420200b001bef46da26cmr19160871oac.27.1696344779774; Tue, 03 Oct 2023 07:52:59 -0700 (PDT) Received: from localhost.localdomain ([2601:282:4300:19e0::589]) by smtp.gmail.com with ESMTPSA id du28-20020a0568703a1c00b001dd2b869a26sm253477oab.17.2023.10.03.07.52.58 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Tue, 03 Oct 2023 07:52:58 -0700 (PDT) From: Joshua Watt X-Google-Original-From: Joshua Watt To: bitbake-devel@lists.openembedded.org Cc: Joshua Watt Subject: [bitbake-devel][RFC v2 06/12] hashserv: tests: Add external database tests Date: Tue, 3 Oct 2023 08:52:43 -0600 Message-Id: <20231003145249.1166276-7-JPEWhacker@gmail.com> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20231003145249.1166276-1-JPEWhacker@gmail.com> References: <20230928170551.4193224-1-JPEWhacker@gmail.com> <20231003145249.1166276-1-JPEWhacker@gmail.com> MIME-Version: 1.0 List-Id: X-Webhook-Received: from li982-79.members.linode.com [45.33.32.79] by aws-us-west-2-korg-lkml-1.web.codeaurora.org with HTTPS for ; Tue, 03 Oct 2023 14:53:02 -0000 X-Groupsio-URL: https://lists.openembedded.org/g/bitbake-devel/message/15167 Adds support for running the hash equivalence test suite against an external hash equivalence implementation. Signed-off-by: Joshua Watt --- bitbake/lib/hashserv/tests.py | 54 ++++++++++++++++++++++++++++------- 1 file changed, 44 insertions(+), 10 deletions(-) diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index a2827e6a301..82bb005a66d 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py @@ -51,13 +51,20 @@ class HashEquivalenceTestSetup(object): server.serve_as_process(prefunc=prefunc, args=(self.server_index,)) self.addCleanup(cleanup_server, server) + return server + + def start_client(self, server_address): def cleanup_client(client): client.close() - client = create_client(server.address) + client = create_client(server_address) self.addCleanup(cleanup_client, client) - return (client, server) + return client + + def start_test_server(self): + server = self.start_server() + return server.address def setUp(self): if sys.version_info < (3, 5, 0): @@ -66,7 +73,9 @@ class HashEquivalenceTestSetup(object): self.temp_dir = tempfile.TemporaryDirectory(prefix='bb-hashserv') self.addCleanup(self.temp_dir.cleanup) - (self.client, self.server) = self.start_server() + self.server_address = self.start_test_server() + + self.client = self.start_client(self.server_address) def assertClientGetHash(self, client, taskhash, unihash): result = client.get_unihash(self.METHOD, taskhash) @@ -187,7 +196,7 @@ class HashEquivalenceCommonTests(object): def test_stress(self): def query_server(failures): - client = Client(self.server.address) + client = Client(self.server_address) try: for i in range(1000): taskhash = hashlib.sha256() @@ -226,8 +235,10 @@ class HashEquivalenceCommonTests(object): # the side client. It also verifies that the results are pulled into # the downstream database by checking that the downstream and side servers # match after the downstream is done waiting for all backfill tasks - (down_client, down_server) = self.start_server(upstream=self.server.address) - (side_client, side_server) = self.start_server(dbpath=down_server.dbpath) + down_server = self.start_server(upstream=self.server_address) + down_client = self.start_client(down_server.address) + side_server = self.start_server(dbpath=down_server.dbpath) + side_client = self.start_client(side_server.address) def check_hash(taskhash, unihash, old_sidehash): nonlocal down_client @@ -332,14 +343,18 @@ class HashEquivalenceCommonTests(object): self.assertEqual(result['method'], self.METHOD) def test_ro_server(self): - (ro_client, ro_server) = self.start_server(dbpath=self.server.dbpath, read_only=True) + rw_server = self.start_server() + rw_client = self.start_client(rw_server.address) + + ro_server = self.start_server(dbpath=rw_server.dbpath, read_only=True) + ro_client = self.start_client(ro_server.address) # Report a hash via the read-write server taskhash = '35788efcb8dfb0a02659d81cf2bfd695fb30faf9' outhash = '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f' unihash = 'f46d3fbb439bd9b921095da657a4de906510d2cd' - result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) + result = rw_client.report_unihash(taskhash, self.METHOD, outhash, unihash) self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') # Check the hash via the read-only server @@ -354,7 +369,7 @@ class HashEquivalenceCommonTests(object): ro_client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2) # Ensure that the database was not modified - self.assertClientGetHash(self.client, taskhash2, None) + self.assertClientGetHash(rw_client, taskhash2, None) def test_slow_server_start(self): @@ -374,7 +389,7 @@ class HashEquivalenceCommonTests(object): old_signal = signal.signal(signal.SIGTERM, do_nothing) self.addCleanup(signal.signal, signal.SIGTERM, old_signal) - _, server = self.start_server(prefunc=prefunc) + server = self.start_server(prefunc=prefunc) server.process.terminate() time.sleep(30) event.set() @@ -481,3 +496,22 @@ class TestHashEquivalenceWebsocketServer(HashEquivalenceTestSetup, HashEquivalen # case it is more reliable to resolve the IP address explicitly. host = socket.gethostbyname("localhost") return "ws://%s:0" % host + + +class TestHashEquivalenceExternalServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase): + def start_test_server(self): + if 'BB_TEST_HASHSERV' not in os.environ: + self.skipTest('BB_TEST_HASHSERV not defined to test an external server') + + return os.environ['BB_TEST_HASHSERV'] + + def start_server(self, *args, **kwargs): + self.skipTest('Cannot start local server when testing external servers') + + def setUp(self): + super().setUp() + self.client.remove({"method": self.METHOD}) + + def tearDown(self): + self.client.remove({"method": self.METHOD}) + super().tearDown() From patchwork Tue Oct 3 14:52:44 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Joshua Watt X-Patchwork-Id: 31620 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from aws-us-west-2-korg-lkml-1.web.codeaurora.org (localhost.localdomain [127.0.0.1]) by smtp.lore.kernel.org (Postfix) with ESMTP id 42B21E7AD64 for ; Tue, 3 Oct 2023 14:53:12 +0000 (UTC) Received: from mail-oa1-f48.google.com (mail-oa1-f48.google.com [209.85.160.48]) by mx.groups.io with SMTP id smtpd.web10.110648.1696344782468391962 for ; Tue, 03 Oct 2023 07:53:02 -0700 Authentication-Results: mx.groups.io; dkim=pass header.i=@gmail.com header.s=20230601 header.b=kZif6sVq; spf=pass (domain: gmail.com, ip: 209.85.160.48, mailfrom: jpewhacker@gmail.com) Received: by mail-oa1-f48.google.com with SMTP id 586e51a60fabf-1e10507a4d6so687707fac.1 for ; Tue, 03 Oct 2023 07:53:02 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20230601; t=1696344781; x=1696949581; darn=lists.openembedded.org; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=+r7TMZcsmFxTnyD/wPfvCOVnl1EZUyuR4feCCdaAdec=; b=kZif6sVqaAfzQvA0KPHmdDFG4ddKay6J8HYOFoYOGp9fC0iw1ZXxJAf19OB1/EX+P5 Ku7gqD7S73Ry48Wa5bDX3wRKkSchWw4DpadI5JkbAS5FqGd/Dd7ca809FMuiKDrVdcvu as+VCrpRjcBhpRO0cjFZ7iz4DhnReuhWaPQWZdS4lDq0x0TpKdShkcBzfIlTZUu1ZxP8 5BFkXajpXRnwApFgbNCSu7bqPgWo35SWQcker787+/PrWf+rIZvReEvpybSB+dkqQYbu Vc4ZQbDiVQdNz5F8MIjcW6IijUtTISmNpVdPJH/TmCm8sa2JQTkhIt9NJqk6q9OiuuPf UJRg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1696344781; x=1696949581; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-message-state:from:to:cc :subject:date:message-id:reply-to; bh=+r7TMZcsmFxTnyD/wPfvCOVnl1EZUyuR4feCCdaAdec=; b=XE0iQl24fNg3vJvqfjV8OUSBEW2S2P0fNQ3OSBkNbae00qi88HXPPOOqKOt0IVHKxj gCfribbeJOqO9h3saU6rlj42ICQ2bMUbJu6zZ6rP3THaG7t6Uz4tla5zayKMuEeenJEm sXq2sxPxYYTCRG6vleZG99d4Skyv3ELXhtbEpd3qAmcOYh56BUSSYNpXX6UVcZAziPxn g7YVkm3FglMovX816gL0w/grSdB2vdWWMyL+SadA4gPeo1CNdm3Q8g0GjtHpcvMRctW8 lRTwCs4drKH/QfSOJYglPhC7lLOQ7my8krhgXBQ6/jq/pzzGTVu9gqklwxOE7reUagVl Q+iQ== X-Gm-Message-State: AOJu0YxFn++tPrfKTuEb85R4vqniEIjN+5zBp90q8MCDsdUTbinB6uIf qrlgs68NdYG8TvV0tievocvOpsN7kSE= X-Google-Smtp-Source: AGHT+IHFR/yPC/AM5Lhs+8Y7j6VBtadzz188dvC7/9si4ZMeh5lMOM0tHzaEGRT95/lngUEfv5fYYA== X-Received: by 2002:a05:6871:78a:b0:1d7:1533:6869 with SMTP id o10-20020a056871078a00b001d715336869mr15819677oap.31.1696344781152; Tue, 03 Oct 2023 07:53:01 -0700 (PDT) Received: from localhost.localdomain ([2601:282:4300:19e0::589]) by smtp.gmail.com with ESMTPSA id du28-20020a0568703a1c00b001dd2b869a26sm253477oab.17.2023.10.03.07.52.59 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Tue, 03 Oct 2023 07:53:00 -0700 (PDT) From: Joshua Watt X-Google-Original-From: Joshua Watt To: bitbake-devel@lists.openembedded.org Cc: Joshua Watt Subject: [bitbake-devel][RFC v2 07/12] asyncrpc: Prefix log messages with client info Date: Tue, 3 Oct 2023 08:52:44 -0600 Message-Id: <20231003145249.1166276-8-JPEWhacker@gmail.com> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20231003145249.1166276-1-JPEWhacker@gmail.com> References: <20230928170551.4193224-1-JPEWhacker@gmail.com> <20231003145249.1166276-1-JPEWhacker@gmail.com> MIME-Version: 1.0 List-Id: X-Webhook-Received: from li982-79.members.linode.com [45.33.32.79] by aws-us-west-2-korg-lkml-1.web.codeaurora.org with HTTPS for ; Tue, 03 Oct 2023 14:53:12 -0000 X-Groupsio-URL: https://lists.openembedded.org/g/bitbake-devel/message/15168 Adds a logging adaptor to the asyncrpc clients that prefixes log messages with the client remote address to aid in debugging Signed-off-by: Joshua Watt --- bitbake/lib/bb/asyncrpc/serv.py | 20 +++++++++++++++++--- bitbake/lib/hashserv/server.py | 10 +++++----- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/bitbake/lib/bb/asyncrpc/serv.py b/bitbake/lib/bb/asyncrpc/serv.py index e2589cb0990..c4829b82e0b 100644 --- a/bitbake/lib/bb/asyncrpc/serv.py +++ b/bitbake/lib/bb/asyncrpc/serv.py @@ -12,10 +12,20 @@ import signal import socket import sys import multiprocessing +import logging from .connection import StreamConnection, WebsocketConnection from .exceptions import ClientError, ServerError, ConnectionClosedError +class ClientLoggerAdapter(logging.LoggerAdapter): + def __init__(self, logger, address): + super().__init__(logger) + self.address = address + + def process(self, msg, kwargs): + return f"[Client {self.address}] {msg}", kwargs + + class AsyncServerConnection(object): def __init__(self, socket, proto_name, logger): self.socket = socket @@ -23,7 +33,7 @@ class AsyncServerConnection(object): self.handlers = { "ping": self.handle_ping, } - self.logger = logger + self.logger = ClientLoggerAdapter(logger, socket.address) async def close(self): await self.socket.close() @@ -236,16 +246,20 @@ class AsyncServer(object): self.server = WebsocketsServer(host, port, self._client_handler, self.logger) async def _client_handler(self, socket): + address = socket.address try: client = self.accept_client(socket) await client.process_requests() except Exception as e: import traceback - self.logger.error("Error from client: %s" % str(e), exc_info=True) + self.logger.error( + "Error from client %s: %s" % (address, str(e)), exc_info=True + ) traceback.print_exc() + finally: + self.logger.debug("Client %s disconnected", address) await socket.close() - self.logger.debug("Client disconnected") @abc.abstractmethod def accept_client(self, socket): diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index 35465240ae9..ee959a6bd94 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py @@ -206,7 +206,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): async def dispatch_message(self, msg): for k in self.handlers.keys(): if k in msg: - logger.debug('Handling %s' % k) + self.logger.debug('Handling %s' % k) if 'stream' in k: return await self.handlers[k](msg[k]) else: @@ -335,7 +335,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): break (method, taskhash) = l.split() - #logger.debug('Looking up %s %s' % (method, taskhash)) + #self.logger.debug('Looking up %s %s' % (method, taskhash)) cursor = self.db.cursor() try: row = self.query_equivalent(cursor, method, taskhash) @@ -344,7 +344,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): if row is not None: msg = row['unihash'] - #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) + #self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) elif self.upstream_client is not None: upstream = await self.upstream_client.get_unihash(method, taskhash) if upstream: @@ -463,8 +463,8 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): row = self.query_equivalent(cursor, data['method'], data['taskhash']) if row['unihash'] == data['unihash']: - logger.info('Adding taskhash equivalence for %s with unihash %s', - data['taskhash'], row['unihash']) + self.logger.info('Adding taskhash equivalence for %s with unihash %s', + data['taskhash'], row['unihash']) d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} From patchwork Tue Oct 3 14:52:45 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Joshua Watt X-Patchwork-Id: 31621 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from aws-us-west-2-korg-lkml-1.web.codeaurora.org (localhost.localdomain [127.0.0.1]) by smtp.lore.kernel.org (Postfix) with ESMTP id 432D5E7AD58 for ; Tue, 3 Oct 2023 14:53:12 +0000 (UTC) Received: from mail-ot1-f44.google.com (mail-ot1-f44.google.com [209.85.210.44]) by mx.groups.io with SMTP id smtpd.web10.110650.1696344783701301306 for ; Tue, 03 Oct 2023 07:53:03 -0700 Authentication-Results: mx.groups.io; dkim=pass header.i=@gmail.com header.s=20230601 header.b=KOvbOo/d; spf=pass (domain: gmail.com, ip: 209.85.210.44, mailfrom: jpewhacker@gmail.com) Received: by mail-ot1-f44.google.com with SMTP id 46e09a7af769-6c4e30a3604so648849a34.2 for ; Tue, 03 Oct 2023 07:53:03 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20230601; t=1696344782; x=1696949582; darn=lists.openembedded.org; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=t42Q3LzyAB2+w1wzsjUwDz49h7zVohMK3DtJAqm5J14=; b=KOvbOo/d5qIUCUYorQv+Pn98xtUSIS/DX+Ohv9aLP4+7ocCB+RDNCj8SGtx/8Joqq8 4T52fD85UNPKYDZ7p0qoi5jzEavh0Z+ozZhOrkN3dn7IRGQ1Mdq4hnlj6Ae1ZeRvrx0F 2RqnSQ4GQ37nXTI3QfoOtoWWApy1E64h8eqX3Mkvsq+2m+YOYCZk14dO53Cqih1x2xFi rcCCC7dpE2ee/u8VkmXgIsvHnca7hcCmpTZlnDb7FDFXOh956d6XkjvIstriH1+xtrCn vF5UXsbWmojxz4fKL41A7JwpP6ANku7Z3B6MuD0SH4j0axCRd1EEfPpcnpJkhihj/Zow cOvA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1696344782; x=1696949582; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-message-state:from:to:cc :subject:date:message-id:reply-to; bh=t42Q3LzyAB2+w1wzsjUwDz49h7zVohMK3DtJAqm5J14=; b=e9jq5g4/L1076h9PhmAZrSjJ4S+MmgLAceigPCs0nGfSFLUVoV/cWqpcYhdNj4Gvy9 ITnuucLPFe2NVBrLoqiNJKRjAljoatKRZWqEc0XfGt3OkhOrzz2p/+MlQ8tIRAA0+Hni OZFtxTO/f38kltfcEfoXLt4qOBfAC31MGachzANt0NFXcN41vh5wVlHmiLt10Ci54T65 EAeXqr71FIakjl60J3k4AA3wB38UmDgz05cKspb0Dj33XwONqSTReEyUaQxP2pe/VIfZ kyMEQhkM8duMcs7eDV+khFL2hKPPorKO9iFv6STPZhYX02K4XKCGmfOsBTEgr724zw4P /CwA== X-Gm-Message-State: AOJu0YyJm+COk6P3qVbKRtPzvEMSsxWTLB/6m8ByPUisZakBN5s2InUi tjBs+OwG7voef8Lxve/CuKQnC7Yw5bA= X-Google-Smtp-Source: AGHT+IG6Vqfce6NOCzfCdXQof0J6GFk41MULaanjjw0B82DuUHtJGmCqogXjUmzfDzB42TXilGffhg== X-Received: by 2002:a05:6870:b293:b0:1cc:e169:96bf with SMTP id c19-20020a056870b29300b001cce16996bfmr17337583oao.39.1696344782396; Tue, 03 Oct 2023 07:53:02 -0700 (PDT) Received: from localhost.localdomain ([2601:282:4300:19e0::589]) by smtp.gmail.com with ESMTPSA id du28-20020a0568703a1c00b001dd2b869a26sm253477oab.17.2023.10.03.07.53.01 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Tue, 03 Oct 2023 07:53:01 -0700 (PDT) From: Joshua Watt X-Google-Original-From: Joshua Watt To: bitbake-devel@lists.openembedded.org Cc: Joshua Watt Subject: [bitbake-devel][RFC v2 08/12] bitbake-hashserv: Allow arguments from environment Date: Tue, 3 Oct 2023 08:52:45 -0600 Message-Id: <20231003145249.1166276-9-JPEWhacker@gmail.com> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20231003145249.1166276-1-JPEWhacker@gmail.com> References: <20230928170551.4193224-1-JPEWhacker@gmail.com> <20231003145249.1166276-1-JPEWhacker@gmail.com> MIME-Version: 1.0 List-Id: X-Webhook-Received: from li982-79.members.linode.com [45.33.32.79] by aws-us-west-2-korg-lkml-1.web.codeaurora.org with HTTPS for ; Tue, 03 Oct 2023 14:53:12 -0000 X-Groupsio-URL: https://lists.openembedded.org/g/bitbake-devel/message/15169 Allows the arguments to the bitbake-hashserv command to be specified in environment variables. This is a very common idiom when running services in containers as it allows the arguments to be specified from different sources as desired by the service administrator Signed-off-by: Joshua Watt --- bitbake/bin/bitbake-hashserv | 75 ++++++++++++++++++++++++++---------- 1 file changed, 55 insertions(+), 20 deletions(-) diff --git a/bitbake/bin/bitbake-hashserv b/bitbake/bin/bitbake-hashserv index 00af76b2d11..b09458b60c4 100755 --- a/bitbake/bin/bitbake-hashserv +++ b/bitbake/bin/bitbake-hashserv @@ -11,56 +11,91 @@ import logging import argparse import sqlite3 import warnings + warnings.simplefilter("default") -sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(__file__)), 'lib')) +sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(__file__)), "lib")) import hashserv VERSION = "1.0.0" -DEFAULT_BIND = 'unix://./hashserve.sock' +DEFAULT_BIND = "unix://./hashserve.sock" def main(): - parser = argparse.ArgumentParser(description='Hash Equivalence Reference Server. Version=%s' % VERSION, - epilog='''The bind address is the path to a unix domain socket if it is - prefixed with "unix://". Otherwise, it is an IP address - and port in form ADDRESS:PORT. To bind to all addresses, leave - the ADDRESS empty, e.g. "--bind :8686". To bind to a specific - IPv6 address, enclose the address in "[]", e.g. - "--bind [::1]:8686"''' - ) - - parser.add_argument('-b', '--bind', default=DEFAULT_BIND, help='Bind address (default "%(default)s")') - parser.add_argument('-d', '--database', default='./hashserv.db', help='Database file (default "%(default)s")') - parser.add_argument('-l', '--log', default='WARNING', help='Set logging level') - parser.add_argument('-u', '--upstream', help='Upstream hashserv to pull hashes from') - parser.add_argument('-r', '--read-only', action='store_true', help='Disallow write operations from clients') + parser = argparse.ArgumentParser( + description="Hash Equivalence Reference Server. Version=%s" % VERSION, + epilog='''The bind address is the path to a unix domain socket if it is + prefixed with "unix://". Otherwise, it is an IP address + and port in form ADDRESS:PORT. To bind to all addresses, leave + the ADDRESS empty, e.g. "--bind :8686". To bind to a specific + IPv6 address, enclose the address in "[]", e.g. + "--bind [::1]:8686"''', + ) + + parser.add_argument( + "-b", + "--bind", + default=os.environ.get("HASHSERVER_BIND", DEFAULT_BIND), + help='Bind address (default $HASHSERVER_BIND, "%(default)s")', + ) + parser.add_argument( + "-d", + "--database", + default=os.environ.get("HASHSERVER_DB", "./hashserv.db"), + help='Database file (default $HASHSERVER_DB, "%(default)s")', + ) + parser.add_argument( + "-l", + "--log", + default=os.environ.get("HASHSERVER_LOG_LEVEL", "WARNING"), + help='Set logging level (default $HASHSERVER_LOG_LEVEL, "%(default)s")', + ) + parser.add_argument( + "-u", + "--upstream", + default=os.environ.get("HASHSERVER_UPSTREAM", None), + help="Upstream hashserv to pull hashes from ($HASHSERVER_UPSTREAM)", + ) + parser.add_argument( + "-r", + "--read-only", + action="store_true", + help="Disallow write operations from clients ($HASHSERVER_READ_ONLY)", + ) args = parser.parse_args() - logger = logging.getLogger('hashserv') + logger = logging.getLogger("hashserv") level = getattr(logging, args.log.upper(), None) if not isinstance(level, int): - raise ValueError('Invalid log level: %s' % args.log) + raise ValueError("Invalid log level: %s" % args.log) logger.setLevel(level) console = logging.StreamHandler() console.setLevel(level) logger.addHandler(console) - server = hashserv.create_server(args.bind, args.database, upstream=args.upstream, read_only=args.read_only) + read_only = (os.environ.get("HASHSERVER_READ_ONLY", "0") == "1") or args.read_only + + server = hashserv.create_server( + args.bind, + args.database, + upstream=args.upstream, + read_only=read_only, + ) server.serve_forever() return 0 -if __name__ == '__main__': +if __name__ == "__main__": try: ret = main() except Exception: ret = 1 import traceback + traceback.print_exc() sys.exit(ret) From patchwork Tue Oct 3 14:52:46 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Joshua Watt X-Patchwork-Id: 31625 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from aws-us-west-2-korg-lkml-1.web.codeaurora.org (localhost.localdomain [127.0.0.1]) by smtp.lore.kernel.org (Postfix) with ESMTP id 5A0DEE7AD6D for ; Tue, 3 Oct 2023 14:53:12 +0000 (UTC) Received: from mail-oa1-f53.google.com (mail-oa1-f53.google.com [209.85.160.53]) by mx.groups.io with SMTP id smtpd.web10.110653.1696344785549261402 for ; Tue, 03 Oct 2023 07:53:05 -0700 Authentication-Results: mx.groups.io; dkim=pass header.i=@gmail.com header.s=20230601 header.b=DzHUdezi; spf=pass (domain: gmail.com, ip: 209.85.160.53, mailfrom: jpewhacker@gmail.com) Received: by mail-oa1-f53.google.com with SMTP id 586e51a60fabf-1dd78b46995so590529fac.3 for ; Tue, 03 Oct 2023 07:53:05 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20230601; t=1696344784; x=1696949584; darn=lists.openembedded.org; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=7xD3m5AOi2Zw3Foi3ly0RRqyO/wxXeKOxU6fSmLTvqc=; b=DzHUdeziakF/ZCGUguhN5TPfCgu1jhoGFYmuDACyO31MIe6hzT+LgMNg4Sp6JsLhg/ qOOKJwUaXl/N38Z/BXJTKpnyzHXmKW9sjvt702slhH+POEc+cUv7CE6b2J1kVlpneMzX jtvaPEmbCYJqkfZzEStI9w2wRDj3PZfVYmk5hvn3HYcWq25B6QMV1E7Ew/4J2wgCqeRN Y7jnRzRPX+dv5lwk3qmICFFRSq1ErtQzHditxWSOEuRaigJgsTG5fLSb+SHKDixMIaGC DBa9DF0fHWGLBeAMAbZidvdNG5ZjPeN+vuKWvJj3VFFYs027LQByT+w6FbNCAA5TmjyW Xj9w== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1696344784; x=1696949584; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-message-state:from:to:cc :subject:date:message-id:reply-to; bh=7xD3m5AOi2Zw3Foi3ly0RRqyO/wxXeKOxU6fSmLTvqc=; b=jSh1eua391IWSmt3YfXQnxT7nNdtVNGYOBgE4NkAXF+Hh4VDV25NRZC0NVEP3oxJss 1WlMaYONEmIAYg0Dgdy/DoeFczR6d/WF2JSd2/hQFi1RkuiGdRM73coeSK3IquEiEHHR F1GVoD7dyAjctvqFO9EloEx9KiZkR08pIl5Gu3H5tlFixnvX0Z67UWf82bLkJexNbNsr 9xKrBWKd2fM0AFtlRSvguPqI2aCYfyL7bieijp5OS+00Lcuob6A4aBhow3t0VGSN1YoD uwW77RI2oki/5xi2p/wk38D48CCppCvegd/S64OCr1RgyEM9IKOWQOjGiXHnunjFFacD /5Ug== X-Gm-Message-State: AOJu0Yw0lU++15bD9KXJ4rY4FqgxKNcw5pBfnHkEjyJinB9nEg68D3b0 G99r9CsmBRPt7//Lf3uDcfb3xHRydUE= X-Google-Smtp-Source: AGHT+IG+95p/ekuygGjv+sJ/NTB9MJoVIMIblLa9gk+g75yfB0bJ7YRl02JdUO21UnJbFob63Ynr6g== X-Received: by 2002:a05:6870:1686:b0:1bf:df47:7b5d with SMTP id j6-20020a056870168600b001bfdf477b5dmr17398494oae.24.1696344783774; Tue, 03 Oct 2023 07:53:03 -0700 (PDT) Received: from localhost.localdomain ([2601:282:4300:19e0::589]) by smtp.gmail.com with ESMTPSA id du28-20020a0568703a1c00b001dd2b869a26sm253477oab.17.2023.10.03.07.53.02 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Tue, 03 Oct 2023 07:53:02 -0700 (PDT) From: Joshua Watt X-Google-Original-From: Joshua Watt To: bitbake-devel@lists.openembedded.org Cc: Joshua Watt Subject: [bitbake-devel][RFC v2 09/12] hashserv: Abstract database Date: Tue, 3 Oct 2023 08:52:46 -0600 Message-Id: <20231003145249.1166276-10-JPEWhacker@gmail.com> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20231003145249.1166276-1-JPEWhacker@gmail.com> References: <20230928170551.4193224-1-JPEWhacker@gmail.com> <20231003145249.1166276-1-JPEWhacker@gmail.com> MIME-Version: 1.0 List-Id: X-Webhook-Received: from li982-79.members.linode.com [45.33.32.79] by aws-us-west-2-korg-lkml-1.web.codeaurora.org with HTTPS for ; Tue, 03 Oct 2023 14:53:12 -0000 X-Groupsio-URL: https://lists.openembedded.org/g/bitbake-devel/message/15170 Abstracts the way the database backend is accessed by the hash equivalence server to make it possible to use other backends Signed-off-by: Joshua Watt --- bitbake/lib/hashserv/__init__.py | 90 ++---- bitbake/lib/hashserv/server.py | 460 +++++++++++-------------------- bitbake/lib/hashserv/sqlite.py | 229 +++++++++++++++ 3 files changed, 405 insertions(+), 374 deletions(-) create mode 100644 bitbake/lib/hashserv/sqlite.py diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py index 56b9c6bc829..90d8cff15fb 100644 --- a/bitbake/lib/hashserv/__init__.py +++ b/bitbake/lib/hashserv/__init__.py @@ -6,7 +6,6 @@ import asyncio from contextlib import closing import re -import sqlite3 import itertools import json from urllib.parse import urlparse @@ -19,92 +18,34 @@ ADDR_TYPE_UNIX = 0 ADDR_TYPE_TCP = 1 ADDR_TYPE_WS = 2 -UNIHASH_TABLE_DEFINITION = ( - ("method", "TEXT NOT NULL", "UNIQUE"), - ("taskhash", "TEXT NOT NULL", "UNIQUE"), - ("unihash", "TEXT NOT NULL", ""), -) - -UNIHASH_TABLE_COLUMNS = tuple(name for name, _, _ in UNIHASH_TABLE_DEFINITION) - -OUTHASH_TABLE_DEFINITION = ( - ("method", "TEXT NOT NULL", "UNIQUE"), - ("taskhash", "TEXT NOT NULL", "UNIQUE"), - ("outhash", "TEXT NOT NULL", "UNIQUE"), - ("created", "DATETIME", ""), - - # Optional fields - ("owner", "TEXT", ""), - ("PN", "TEXT", ""), - ("PV", "TEXT", ""), - ("PR", "TEXT", ""), - ("task", "TEXT", ""), - ("outhash_siginfo", "TEXT", ""), -) - -OUTHASH_TABLE_COLUMNS = tuple(name for name, _, _ in OUTHASH_TABLE_DEFINITION) - -def _make_table(cursor, name, definition): - cursor.execute(''' - CREATE TABLE IF NOT EXISTS {name} ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - {fields} - UNIQUE({unique}) - ) - '''.format( - name=name, - fields=" ".join("%s %s," % (name, typ) for name, typ, _ in definition), - unique=", ".join(name for name, _, flags in definition if "UNIQUE" in flags) - )) - - -def setup_database(database, sync=True): - db = sqlite3.connect(database) - db.row_factory = sqlite3.Row - - with closing(db.cursor()) as cursor: - _make_table(cursor, "unihashes_v2", UNIHASH_TABLE_DEFINITION) - _make_table(cursor, "outhashes_v2", OUTHASH_TABLE_DEFINITION) - - cursor.execute('PRAGMA journal_mode = WAL') - cursor.execute('PRAGMA synchronous = %s' % ('NORMAL' if sync else 'OFF')) - - # Drop old indexes - cursor.execute('DROP INDEX IF EXISTS taskhash_lookup') - cursor.execute('DROP INDEX IF EXISTS outhash_lookup') - cursor.execute('DROP INDEX IF EXISTS taskhash_lookup_v2') - cursor.execute('DROP INDEX IF EXISTS outhash_lookup_v2') - - # TODO: Upgrade from tasks_v2? - cursor.execute('DROP TABLE IF EXISTS tasks_v2') - - # Create new indexes - cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup_v3 ON unihashes_v2 (method, taskhash)') - cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)') - - return db - def parse_address(addr): if addr.startswith(UNIX_PREFIX): - return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX):],)) + return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX) :],)) elif addr.startswith(WS_PREFIX) or addr.startswith(WSS_PREFIX): return (ADDR_TYPE_WS, (addr,)) else: - m = re.match(r'\[(?P[^\]]*)\]:(?P\d+)$', addr) + m = re.match(r"\[(?P[^\]]*)\]:(?P\d+)$", addr) if m is not None: - host = m.group('host') - port = m.group('port') + host = m.group("host") + port = m.group("port") else: - host, port = addr.split(':') + host, port = addr.split(":") return (ADDR_TYPE_TCP, (host, int(port))) def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False): + def sqlite_engine(): + from .sqlite import DatabaseEngine + + return DatabaseEngine(dbname, sync) + from . import server - db = setup_database(dbname, sync=sync) - s = server.Server(db, upstream=upstream, read_only=read_only) + + db_engine = sqlite_engine() + + s = server.Server(db_engine, upstream=upstream, read_only=read_only) (typ, a) = parse_address(addr) if typ == ADDR_TYPE_UNIX: @@ -120,6 +61,7 @@ def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False): def create_client(addr): from . import client + c = client.Client() (typ, a) = parse_address(addr) @@ -132,8 +74,10 @@ def create_client(addr): return c + async def create_async_client(addr): from . import client + c = client.AsyncClient() (typ, a) = parse_address(addr) diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index ee959a6bd94..238cdab3f7c 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py @@ -5,16 +5,15 @@ from contextlib import closing, contextmanager from datetime import datetime -import enum import asyncio import logging import math import time -from . import create_async_client, UNIHASH_TABLE_COLUMNS, OUTHASH_TABLE_COLUMNS +from . import create_async_client import bb.asyncrpc -logger = logging.getLogger('hashserv.server') +logger = logging.getLogger("hashserv.server") class Measurement(object): @@ -104,213 +103,130 @@ class Stats(object): return math.sqrt(self.s / (self.num - 1)) def todict(self): - return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')} - - -@enum.unique -class Resolve(enum.Enum): - FAIL = enum.auto() - IGNORE = enum.auto() - REPLACE = enum.auto() - - -def insert_table(cursor, table, data, on_conflict): - resolve = { - Resolve.FAIL: "", - Resolve.IGNORE: " OR IGNORE", - Resolve.REPLACE: " OR REPLACE", - }[on_conflict] - - keys = sorted(data.keys()) - query = 'INSERT{resolve} INTO {table} ({fields}) VALUES({values})'.format( - resolve=resolve, - table=table, - fields=", ".join(keys), - values=", ".join(":" + k for k in keys), - ) - prevrowid = cursor.lastrowid - cursor.execute(query, data) - logging.debug( - "Inserting %r into %s, %s", - data, - table, - on_conflict - ) - return (cursor.lastrowid, cursor.lastrowid != prevrowid) - -def insert_unihash(cursor, data, on_conflict): - return insert_table(cursor, "unihashes_v2", data, on_conflict) - -def insert_outhash(cursor, data, on_conflict): - return insert_table(cursor, "outhashes_v2", data, on_conflict) - -async def copy_unihash_from_upstream(client, db, method, taskhash): - d = await client.get_taskhash(method, taskhash) - if d is not None: - with closing(db.cursor()) as cursor: - insert_unihash( - cursor, - {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS}, - Resolve.IGNORE, - ) - db.commit() - return d - - -class ServerCursor(object): - def __init__(self, db, cursor, upstream): - self.db = db - self.cursor = cursor - self.upstream = upstream + return { + k: getattr(self, k) + for k in ("num", "total_time", "max_time", "average", "stdev") + } class ServerClient(bb.asyncrpc.AsyncServerConnection): - def __init__(self, socket, db, request_stats, backfill_queue, upstream, read_only): - super().__init__(socket, 'OEHASHEQUIV', logger) - self.db = db + def __init__( + self, + socket, + db_engine, + request_stats, + backfill_queue, + upstream, + read_only, + ): + super().__init__(socket, "OEHASHEQUIV", logger) + self.db_engine = db_engine self.request_stats = request_stats self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK self.backfill_queue = backfill_queue self.upstream = upstream - self.handlers.update({ - 'get': self.handle_get, - 'get-outhash': self.handle_get_outhash, - 'get-stream': self.handle_get_stream, - 'get-stats': self.handle_get_stats, - }) + self.handlers.update( + { + "get": self.handle_get, + "get-outhash": self.handle_get_outhash, + "get-stream": self.handle_get_stream, + "get-stats": self.handle_get_stats, + } + ) if not read_only: - self.handlers.update({ - 'report': self.handle_report, - 'report-equiv': self.handle_equivreport, - 'reset-stats': self.handle_reset_stats, - 'backfill-wait': self.handle_backfill_wait, - 'remove': self.handle_remove, - }) + self.handlers.update( + { + "report": self.handle_report, + "report-equiv": self.handle_equivreport, + "reset-stats": self.handle_reset_stats, + "backfill-wait": self.handle_backfill_wait, + "remove": self.handle_remove, + } + ) def validate_proto_version(self): - return (self.proto_version > (1, 0) and self.proto_version <= (1, 1)) + return self.proto_version > (1, 0) and self.proto_version <= (1, 1) async def process_requests(self): - if self.upstream is not None: - self.upstream_client = await create_async_client(self.upstream) - else: - self.upstream_client = None - - await super().process_requests() + async with self.db_engine.connect(self.logger) as db: + self.db = db + if self.upstream is not None: + self.upstream_client = await create_async_client(self.upstream) + else: + self.upstream_client = None - if self.upstream_client is not None: - await self.upstream_client.close() + try: + await super().process_requests() + finally: + if self.upstream_client is not None: + await self.upstream_client.close() async def dispatch_message(self, msg): for k in self.handlers.keys(): if k in msg: - self.logger.debug('Handling %s' % k) - if 'stream' in k: + self.logger.debug("Handling %s" % k) + if "stream" in k: return await self.handlers[k](msg[k]) else: - with self.request_stats.start_sample() as self.request_sample, \ - self.request_sample.measure(): + with self.request_stats.start_sample() as self.request_sample, self.request_sample.measure(): return await self.handlers[k](msg[k]) raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) async def handle_get(self, request): - method = request['method'] - taskhash = request['taskhash'] - fetch_all = request.get('all', False) + method = request["method"] + taskhash = request["taskhash"] + fetch_all = request.get("all", False) - with closing(self.db.cursor()) as cursor: - return await self.get_unihash(cursor, method, taskhash, fetch_all) + return await self.get_unihash(method, taskhash, fetch_all) - async def get_unihash(self, cursor, method, taskhash, fetch_all=False): + async def get_unihash(self, method, taskhash, fetch_all=False): d = None if fetch_all: - cursor.execute( - ''' - SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 - INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash - WHERE outhashes_v2.method=:method AND outhashes_v2.taskhash=:taskhash - ORDER BY outhashes_v2.created ASC - LIMIT 1 - ''', - { - 'method': method, - 'taskhash': taskhash, - } - - ) - row = cursor.fetchone() - + row = await self.db.get_unihash_by_taskhash_full(method, taskhash) if row is not None: d = {k: row[k] for k in row.keys()} elif self.upstream_client is not None: d = await self.upstream_client.get_taskhash(method, taskhash, True) - self.update_unified(cursor, d) - self.db.commit() + await self.update_unified(d) else: - row = self.query_equivalent(cursor, method, taskhash) + row = await self.db.get_equivalent(method, taskhash) if row is not None: d = {k: row[k] for k in row.keys()} elif self.upstream_client is not None: d = await self.upstream_client.get_taskhash(method, taskhash) - d = {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS} - insert_unihash(cursor, d, Resolve.IGNORE) - self.db.commit() + await self.db.insert_unihash(d["method"], d["taskhash"], d["unihash"]) return d async def handle_get_outhash(self, request): - method = request['method'] - outhash = request['outhash'] - taskhash = request['taskhash'] + method = request["method"] + outhash = request["outhash"] + taskhash = request["taskhash"] - with closing(self.db.cursor()) as cursor: - return await self.get_outhash(cursor, method, outhash, taskhash) + return await self.get_outhash(method, outhash, taskhash) - async def get_outhash(self, cursor, method, outhash, taskhash): + async def get_outhash(self, method, outhash, taskhash): d = None - cursor.execute( - ''' - SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 - INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash - WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash - ORDER BY outhashes_v2.created ASC - LIMIT 1 - ''', - { - 'method': method, - 'outhash': outhash, - } - ) - row = cursor.fetchone() - + row = await self.db.get_unihash_by_outhash(method, outhash) if row is not None: d = {k: row[k] for k in row.keys()} elif self.upstream_client is not None: d = await self.upstream_client.get_outhash(method, outhash, taskhash) - self.update_unified(cursor, d) - self.db.commit() + await self.update_unified(d) return d - def update_unified(self, cursor, data): + async def update_unified(self, data): if data is None: return - insert_unihash( - cursor, - {k: v for k, v in data.items() if k in UNIHASH_TABLE_COLUMNS}, - Resolve.IGNORE - ) - insert_outhash( - cursor, - {k: v for k, v in data.items() if k in OUTHASH_TABLE_COLUMNS}, - Resolve.IGNORE - ) + await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"]) + await self.db.insert_outhash(data) async def handle_get_stream(self, request): await self.socket.send("ok") @@ -331,20 +247,16 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): request_measure = self.request_sample.measure() request_measure.start() - if l == 'END': + if l == "END": break (method, taskhash) = l.split() - #self.logger.debug('Looking up %s %s' % (method, taskhash)) - cursor = self.db.cursor() - try: - row = self.query_equivalent(cursor, method, taskhash) - finally: - cursor.close() + # self.logger.debug('Looking up %s %s' % (method, taskhash)) + row = await self.db.get_equivalent(method, taskhash) if row is not None: - msg = row['unihash'] - #self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) + msg = row["unihash"] + # self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) elif self.upstream_client is not None: upstream = await self.upstream_client.get_unihash(method, taskhash) if upstream: @@ -367,118 +279,81 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): return "ok" async def handle_report(self, data): - with closing(self.db.cursor()) as cursor: - outhash_data = { - 'method': data['method'], - 'outhash': data['outhash'], - 'taskhash': data['taskhash'], - 'created': datetime.now() - } + outhash_data = { + "method": data["method"], + "outhash": data["outhash"], + "taskhash": data["taskhash"], + "created": datetime.now(), + } - for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'): - if k in data: - outhash_data[k] = data[k] - - # Insert the new entry, unless it already exists - (rowid, inserted) = insert_outhash(cursor, outhash_data, Resolve.IGNORE) - - if inserted: - # If this row is new, check if it is equivalent to another - # output hash - cursor.execute( - ''' - SELECT outhashes_v2.taskhash AS taskhash, unihashes_v2.unihash AS unihash FROM outhashes_v2 - INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash - -- Select any matching output hash except the one we just inserted - WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash AND outhashes_v2.taskhash!=:taskhash - -- Pick the oldest hash - ORDER BY outhashes_v2.created ASC - LIMIT 1 - ''', - { - 'method': data['method'], - 'outhash': data['outhash'], - 'taskhash': data['taskhash'], - } - ) - row = cursor.fetchone() + for k in ("owner", "PN", "PV", "PR", "task", "outhash_siginfo"): + if k in data: + outhash_data[k] = data[k] - if row is not None: - # A matching output hash was found. Set our taskhash to the - # same unihash since they are equivalent - unihash = row['unihash'] - resolve = Resolve.IGNORE - else: - # No matching output hash was found. This is probably the - # first outhash to be added. - unihash = data['unihash'] - resolve = Resolve.IGNORE - - # Query upstream to see if it has a unihash we can use - if self.upstream_client is not None: - upstream_data = await self.upstream_client.get_outhash(data['method'], data['outhash'], data['taskhash']) - if upstream_data is not None: - unihash = upstream_data['unihash'] - - - insert_unihash( - cursor, - { - 'method': data['method'], - 'taskhash': data['taskhash'], - 'unihash': unihash, - }, - resolve - ) - - unihash_data = await self.get_unihash(cursor, data['method'], data['taskhash']) - if unihash_data is not None: - unihash = unihash_data['unihash'] - else: - unihash = data['unihash'] - - self.db.commit() + # Insert the new entry, unless it already exists + if await self.db.insert_outhash(outhash_data): + # If this row is new, check if it is equivalent to another + # output hash + row = await self.db.get_equivalent_for_outhash( + data["method"], data["outhash"], data["taskhash"] + ) - d = { - 'taskhash': data['taskhash'], - 'method': data['method'], - 'unihash': unihash, - } + if row is not None: + # A matching output hash was found. Set our taskhash to the + # same unihash since they are equivalent + unihash = row["unihash"] + else: + # No matching output hash was found. This is probably the + # first outhash to be added. + unihash = data["unihash"] + + # Query upstream to see if it has a unihash we can use + if self.upstream_client is not None: + upstream_data = await self.upstream_client.get_outhash( + data["method"], data["outhash"], data["taskhash"] + ) + if upstream_data is not None: + unihash = upstream_data["unihash"] + + await self.db.insert_unihash(data["method"], data["taskhash"], unihash) + + unihash_data = await self.get_unihash(data["method"], data["taskhash"]) + if unihash_data is not None: + unihash = unihash_data["unihash"] + else: + unihash = data["unihash"] - return d + return { + "taskhash": data["taskhash"], + "method": data["method"], + "unihash": unihash, + } async def handle_equivreport(self, data): - with closing(self.db.cursor()) as cursor: - insert_data = { - 'method': data['method'], - 'taskhash': data['taskhash'], - 'unihash': data['unihash'], - } - insert_unihash(cursor, insert_data, Resolve.IGNORE) - self.db.commit() - - # Fetch the unihash that will be reported for the taskhash. If the - # unihash matches, it means this row was inserted (or the mapping - # was already valid) - row = self.query_equivalent(cursor, data['method'], data['taskhash']) - - if row['unihash'] == data['unihash']: - self.logger.info('Adding taskhash equivalence for %s with unihash %s', - data['taskhash'], row['unihash']) - - d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} - - return d + await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"]) + + # Fetch the unihash that will be reported for the taskhash. If the + # unihash matches, it means this row was inserted (or the mapping + # was already valid) + row = await self.db.get_equivalent(data["method"], data["taskhash"]) + + if row["unihash"] == data["unihash"]: + self.logger.info( + "Adding taskhash equivalence for %s with unihash %s", + data["taskhash"], + row["unihash"], + ) + return {k: row[k] for k in ("taskhash", "method", "unihash")} async def handle_get_stats(self, request): return { - 'requests': self.request_stats.todict(), + "requests": self.request_stats.todict(), } async def handle_reset_stats(self, request): d = { - 'requests': self.request_stats.todict(), + "requests": self.request_stats.todict(), } self.request_stats.reset() @@ -486,7 +361,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): async def handle_backfill_wait(self, request): d = { - 'tasks': self.backfill_queue.qsize(), + "tasks": self.backfill_queue.qsize(), } await self.backfill_queue.join() return d @@ -496,75 +371,58 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): if not isinstance(condition, dict): raise TypeError("Bad condition type %s" % type(condition)) - def do_remove(columns, table_name, cursor): - nonlocal condition - where = {} - for c in columns: - if c in condition and condition[c] is not None: - where[c] = condition[c] - - if where: - query = ('DELETE FROM %s WHERE ' % table_name) + ' AND '.join("%s=:%s" % (k, k) for k in where.keys()) - cursor.execute(query, where) - return cursor.rowcount - - return 0 - - count = 0 - with closing(self.db.cursor()) as cursor: - count += do_remove(OUTHASH_TABLE_COLUMNS, "outhashes_v2", cursor) - count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor) - self.db.commit() - - return {"count": count} - - def query_equivalent(self, cursor, method, taskhash): - # This is part of the inner loop and must be as fast as possible - cursor.execute( - 'SELECT taskhash, method, unihash FROM unihashes_v2 WHERE method=:method AND taskhash=:taskhash', - { - 'method': method, - 'taskhash': taskhash, - } - ) - return cursor.fetchone() + return {"count": await self.db.remove(condition)} class Server(bb.asyncrpc.AsyncServer): - def __init__(self, db, upstream=None, read_only=False): + def __init__(self, db_engine, upstream=None, read_only=False): if upstream and read_only: - raise bb.asyncrpc.ServerError("Read-only hashserv cannot pull from an upstream server") + raise bb.asyncrpc.ServerError( + "Read-only hashserv cannot pull from an upstream server" + ) super().__init__(logger) self.request_stats = Stats() - self.db = db + self.db_engine = db_engine self.upstream = upstream self.read_only = read_only self.backfill_queue = None def accept_client(self, socket): - return ServerClient(socket, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only) + return ServerClient( + socket, + self.db_engine, + self.request_stats, + self.backfill_queue, + self.upstream, + self.read_only, + ) async def backfill_worker_task(self): - client = await create_async_client(self.upstream) - try: + async with await create_async_client( + self.upstream + ) as client, self.db_engine.connect(logger) as db: while True: item = await self.backfill_queue.get() if item is None: self.backfill_queue.task_done() break + method, taskhash = item - await copy_unihash_from_upstream(client, self.db, method, taskhash) + d = await client.get_taskhash(method, taskhash) + if d is not None: + await db.insert_unihash(d["method"], d["taskhash"], d["unihash"]) self.backfill_queue.task_done() - finally: - await client.close() def start(self): tasks = super().start() if self.upstream: self.backfill_queue = asyncio.Queue() tasks += [self.backfill_worker_task()] + + self.loop.run_until_complete(self.db_engine.create()) + return tasks async def stop(self): diff --git a/bitbake/lib/hashserv/sqlite.py b/bitbake/lib/hashserv/sqlite.py new file mode 100644 index 00000000000..866ab179be8 --- /dev/null +++ b/bitbake/lib/hashserv/sqlite.py @@ -0,0 +1,229 @@ +#! /usr/bin/env python3 +# +# Copyright (C) 2023 Garmin Ltd. +# +# SPDX-License-Identifier: GPL-2.0-only +# +import sqlite3 +import logging +from contextlib import closing + +logger = logging.getLogger("hashserv.sqlite") + +UNIHASH_TABLE_DEFINITION = ( + ("method", "TEXT NOT NULL", "UNIQUE"), + ("taskhash", "TEXT NOT NULL", "UNIQUE"), + ("unihash", "TEXT NOT NULL", ""), +) + +UNIHASH_TABLE_COLUMNS = tuple(name for name, _, _ in UNIHASH_TABLE_DEFINITION) + +OUTHASH_TABLE_DEFINITION = ( + ("method", "TEXT NOT NULL", "UNIQUE"), + ("taskhash", "TEXT NOT NULL", "UNIQUE"), + ("outhash", "TEXT NOT NULL", "UNIQUE"), + ("created", "DATETIME", ""), + # Optional fields + ("owner", "TEXT", ""), + ("PN", "TEXT", ""), + ("PV", "TEXT", ""), + ("PR", "TEXT", ""), + ("task", "TEXT", ""), + ("outhash_siginfo", "TEXT", ""), +) + +OUTHASH_TABLE_COLUMNS = tuple(name for name, _, _ in OUTHASH_TABLE_DEFINITION) + + +def _make_table(cursor, name, definition): + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS {name} ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + {fields} + UNIQUE({unique}) + ) + """.format( + name=name, + fields=" ".join("%s %s," % (name, typ) for name, typ, _ in definition), + unique=", ".join( + name for name, _, flags in definition if "UNIQUE" in flags + ), + ) + ) + + +class DatabaseEngine(object): + def __init__(self, dbname, sync): + self.dbname = dbname + self.logger = logger + self.sync = sync + + async def create(self): + db = sqlite3.connect(self.dbname) + db.row_factory = sqlite3.Row + + with closing(db.cursor()) as cursor: + _make_table(cursor, "unihashes_v2", UNIHASH_TABLE_DEFINITION) + _make_table(cursor, "outhashes_v2", OUTHASH_TABLE_DEFINITION) + + cursor.execute("PRAGMA journal_mode = WAL") + cursor.execute( + "PRAGMA synchronous = %s" % ("NORMAL" if self.sync else "OFF") + ) + + # Drop old indexes + cursor.execute("DROP INDEX IF EXISTS taskhash_lookup") + cursor.execute("DROP INDEX IF EXISTS outhash_lookup") + cursor.execute("DROP INDEX IF EXISTS taskhash_lookup_v2") + cursor.execute("DROP INDEX IF EXISTS outhash_lookup_v2") + + # TODO: Upgrade from tasks_v2? + cursor.execute("DROP TABLE IF EXISTS tasks_v2") + + # Create new indexes + cursor.execute( + "CREATE INDEX IF NOT EXISTS taskhash_lookup_v3 ON unihashes_v2 (method, taskhash)" + ) + cursor.execute( + "CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)" + ) + + def connect(self, logger): + return Database(logger, self.dbname) + + +class Database(object): + def __init__(self, logger, dbname, sync=True): + self.dbname = dbname + self.logger = logger + + self.db = sqlite3.connect(self.dbname) + self.db.row_factory = sqlite3.Row + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + await self.close() + + async def close(self): + self.db.close() + + async def get_unihash_by_taskhash_full(self, method, taskhash): + with closing(self.db.cursor()) as cursor: + cursor.execute( + """ + SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 + INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash + WHERE outhashes_v2.method=:method AND outhashes_v2.taskhash=:taskhash + ORDER BY outhashes_v2.created ASC + LIMIT 1 + """, + { + "method": method, + "taskhash": taskhash, + }, + ) + return cursor.fetchone() + + async def get_unihash_by_outhash(self, method, outhash): + with closing(self.db.cursor()) as cursor: + cursor.execute( + """ + SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 + INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash + WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash + ORDER BY outhashes_v2.created ASC + LIMIT 1 + """, + { + "method": method, + "outhash": outhash, + }, + ) + return cursor.fetchone() + + async def get_equivalent_for_outhash(self, method, outhash, taskhash): + with closing(self.db.cursor()) as cursor: + cursor.execute( + """ + SELECT outhashes_v2.taskhash AS taskhash, unihashes_v2.unihash AS unihash FROM outhashes_v2 + INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash + -- Select any matching output hash except the one we just inserted + WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash AND outhashes_v2.taskhash!=:taskhash + -- Pick the oldest hash + ORDER BY outhashes_v2.created ASC + LIMIT 1 + """, + { + "method": method, + "outhash": outhash, + "taskhash": taskhash, + }, + ) + return cursor.fetchone() + + async def get_equivalent(self, method, taskhash): + with closing(self.db.cursor()) as cursor: + cursor.execute( + "SELECT taskhash, method, unihash FROM unihashes_v2 WHERE method=:method AND taskhash=:taskhash", + { + "method": method, + "taskhash": taskhash, + }, + ) + return cursor.fetchone() + + async def remove(self, condition): + def do_remove(columns, table_name, cursor): + where = {} + for c in columns: + if c in condition and condition[c] is not None: + where[c] = condition[c] + + if where: + query = ("DELETE FROM %s WHERE " % table_name) + " AND ".join( + "%s=:%s" % (k, k) for k in where.keys() + ) + cursor.execute(query, where) + return cursor.rowcount + + return 0 + + count = 0 + with closing(self.db.cursor()) as cursor: + count += do_remove(OUTHASH_TABLE_COLUMNS, "outhashes_v2", cursor) + count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor) + self.db.commit() + + return count + + async def insert_unihash(self, method, taskhash, unihash): + with closing(self.db.cursor()) as cursor: + prevrowid = cursor.lastrowid + cursor.execute( + """ + INSERT OR IGNORE INTO unihashes_v2 (method, taskhash, unihash) VALUES(:method, :taskhash, :unihash) + """, + { + "method": method, + "taskhash": taskhash, + "unihash": unihash, + }, + ) + self.db.commit() + return cursor.lastrowid != prevrowid + + async def insert_outhash(self, data): + data = {k: v for k, v in data.items() if k in OUTHASH_TABLE_COLUMNS} + keys = sorted(data.keys()) + query = "INSERT OR IGNORE INTO outhashes_v2 ({fields}) VALUES({values})".format( + fields=", ".join(keys), + values=", ".join(":" + k for k in keys), + ) + with closing(self.db.cursor()) as cursor: + prevrowid = cursor.lastrowid + cursor.execute(query, data) + self.db.commit() + return cursor.lastrowid != prevrowid From patchwork Tue Oct 3 14:52:47 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Joshua Watt X-Patchwork-Id: 31624 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from aws-us-west-2-korg-lkml-1.web.codeaurora.org (localhost.localdomain [127.0.0.1]) by smtp.lore.kernel.org (Postfix) with ESMTP id 58014E7AD6B for ; Tue, 3 Oct 2023 14:53:12 +0000 (UTC) Received: from mail-oa1-f41.google.com (mail-oa1-f41.google.com [209.85.160.41]) by mx.groups.io with SMTP id smtpd.web11.110214.1696344786273602427 for ; Tue, 03 Oct 2023 07:53:06 -0700 Authentication-Results: mx.groups.io; dkim=pass header.i=@gmail.com header.s=20230601 header.b=PmBcZI8K; spf=pass (domain: gmail.com, ip: 209.85.160.41, mailfrom: jpewhacker@gmail.com) Received: by mail-oa1-f41.google.com with SMTP id 586e51a60fabf-1c0fcbf7ae4so702220fac.0 for ; Tue, 03 Oct 2023 07:53:06 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20230601; t=1696344785; x=1696949585; darn=lists.openembedded.org; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=TB9VQOtIjVRTAUmQxfvsunV7qgSnCGloMePCMZ1lJxE=; b=PmBcZI8KHE/vNogURWrrQaiQHPkzSCR1Bl9F2iqTxKBqxQIFKg7rSgAtEhSjsH8dcS uQTe1IRk+EOEgCs+I82nhJWuT5+Q0DA0oOHZEm/nkZxsCc7NOKDQyleOF/4KSz+wFSPX 6uwHPlYluFYGVngBypYp2KiJLAwUBLtwFvmvpm5/kxO5iz3ZlZmK4bPhDRxBx1T/npph gNhsPOHxqewr+AETDSlz0PzGmzUfcKBv0IpAdNfLxSZrnK6V6wf8bcyVrcAYIkt5bedv tKorQyuRs0zVRyqaD+LILr8p6C6Ox/RsF0MJQZacRuF0aMpqUWQ9FRtdXSQ1oN+1RUIS KQ8g== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1696344785; x=1696949585; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-message-state:from:to:cc :subject:date:message-id:reply-to; bh=TB9VQOtIjVRTAUmQxfvsunV7qgSnCGloMePCMZ1lJxE=; b=NzNV5AGB6NE6vL4pc1oqRGZLsM2FY6RC+8O1wRP4xwPcGwz5/XP2njG2tny9DAuPEF 6IAMxilr+R4wBeBY0zXMTqHpBHXaogm3XQKnjv7lZwM/yd39jwrr6GaHC++ObceiNXSN n/17YaFCiWg02IQqfxhJtFarPVnHzKbUw7uO3pvVztOe3yZNoEEc7CXHxYH096obNDZG s+JTVNF01txG1x6THapShBYotu1gHew/BcJQy5MWE/WOswIUTj05mQpR7/PUU6Jf2Nb+ B5B0rLA79ezA2vhSyad2xnvdWpkCiuOSqVW++tYUBhXvCpAEemfBWdtyg35jQpLGAe65 ZBIQ== X-Gm-Message-State: AOJu0Ywvw6UYlSBhO8eVNP71Ze562lM/MDjxjqEZQ0L9yg7qKd78Vzks v6TcR0+4zARdOVpNBFTJLfbWzwm7xPc= X-Google-Smtp-Source: AGHT+IEtOWSExrtKk2+qDEfCwoS9tqBM+9fJOpIkrhEfQscwozFNVN1/nXD3zpJh1DBGM7X5Wu2I3A== X-Received: by 2002:a05:6870:f10d:b0:1d6:617d:62eb with SMTP id k13-20020a056870f10d00b001d6617d62ebmr17268085oac.39.1696344784729; Tue, 03 Oct 2023 07:53:04 -0700 (PDT) Received: from localhost.localdomain ([2601:282:4300:19e0::589]) by smtp.gmail.com with ESMTPSA id du28-20020a0568703a1c00b001dd2b869a26sm253477oab.17.2023.10.03.07.53.03 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Tue, 03 Oct 2023 07:53:04 -0700 (PDT) From: Joshua Watt X-Google-Original-From: Joshua Watt To: bitbake-devel@lists.openembedded.org Cc: Joshua Watt Subject: [bitbake-devel][RFC v2 10/12] hashserv: Add SQLalchemy backend Date: Tue, 3 Oct 2023 08:52:47 -0600 Message-Id: <20231003145249.1166276-11-JPEWhacker@gmail.com> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20231003145249.1166276-1-JPEWhacker@gmail.com> References: <20230928170551.4193224-1-JPEWhacker@gmail.com> <20231003145249.1166276-1-JPEWhacker@gmail.com> MIME-Version: 1.0 List-Id: X-Webhook-Received: from li982-79.members.linode.com [45.33.32.79] by aws-us-west-2-korg-lkml-1.web.codeaurora.org with HTTPS for ; Tue, 03 Oct 2023 14:53:12 -0000 X-Groupsio-URL: https://lists.openembedded.org/g/bitbake-devel/message/15171 Adds an SQLAlchemy backend to the server. While this database backend is slower than the more direct sqlite backend, it easily supports just about any SQL server, which is useful for large scale deployments. Signed-off-by: Joshua Watt --- bitbake/bin/bitbake-hashserv | 12 ++ bitbake/lib/bb/asyncrpc/connection.py | 11 +- bitbake/lib/hashserv/__init__.py | 21 +- bitbake/lib/hashserv/sqlalchemy.py | 268 ++++++++++++++++++++++++++ bitbake/lib/hashserv/tests.py | 19 +- 5 files changed, 326 insertions(+), 5 deletions(-) create mode 100644 bitbake/lib/hashserv/sqlalchemy.py diff --git a/bitbake/bin/bitbake-hashserv b/bitbake/bin/bitbake-hashserv index b09458b60c4..a61e5a1223d 100755 --- a/bitbake/bin/bitbake-hashserv +++ b/bitbake/bin/bitbake-hashserv @@ -64,6 +64,16 @@ def main(): action="store_true", help="Disallow write operations from clients ($HASHSERVER_READ_ONLY)", ) + parser.add_argument( + "--db-username", + default=os.environ.get("HASHSERVER_DB_USERNAME", None), + help="Database username ($HASHSERVER_DB_USERNAME)", + ) + parser.add_argument( + "--db-password", + default=os.environ.get("HASHSERVER_DB_PASSWORD", None), + help="Database password ($HASHSERVER_DB_PASSWORD)", + ) args = parser.parse_args() @@ -85,6 +95,8 @@ def main(): args.database, upstream=args.upstream, read_only=read_only, + db_username=args.db_username, + db_password=args.db_password, ) server.serve_forever() return 0 diff --git a/bitbake/lib/bb/asyncrpc/connection.py b/bitbake/lib/bb/asyncrpc/connection.py index a10628f75a6..7f0cf6ba96e 100644 --- a/bitbake/lib/bb/asyncrpc/connection.py +++ b/bitbake/lib/bb/asyncrpc/connection.py @@ -7,6 +7,7 @@ import asyncio import itertools import json +from datetime import datetime from .exceptions import ClientError, ConnectionClosedError @@ -30,6 +31,12 @@ def chunkify(msg, max_chunk): yield "\n" +def json_serialize(obj): + if isinstance(obj, datetime): + return obj.isoformat() + raise TypeError("Type %s not serializeable" % type(obj)) + + class StreamConnection(object): def __init__(self, reader, writer, timeout, max_chunk=DEFAULT_MAX_CHUNK): self.reader = reader @@ -42,7 +49,7 @@ class StreamConnection(object): return self.writer.get_extra_info("peername") async def send_message(self, msg): - for c in chunkify(json.dumps(msg), self.max_chunk): + for c in chunkify(json.dumps(msg, default=json_serialize), self.max_chunk): self.writer.write(c.encode("utf-8")) await self.writer.drain() @@ -105,7 +112,7 @@ class WebsocketConnection(object): return ":".join(str(s) for s in self.socket.remote_address) async def send_message(self, msg): - await self.send(json.dumps(msg)) + await self.send(json.dumps(msg, default=json_serialize)) async def recv_message(self): m = await self.recv() diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py index 90d8cff15fb..9a8ee4e88bf 100644 --- a/bitbake/lib/hashserv/__init__.py +++ b/bitbake/lib/hashserv/__init__.py @@ -35,15 +35,32 @@ def parse_address(addr): return (ADDR_TYPE_TCP, (host, int(port))) -def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False): +def create_server( + addr, + dbname, + *, + sync=True, + upstream=None, + read_only=False, + db_username=None, + db_password=None +): def sqlite_engine(): from .sqlite import DatabaseEngine return DatabaseEngine(dbname, sync) + def sqlalchemy_engine(): + from .sqlalchemy import DatabaseEngine + + return DatabaseEngine(dbname, db_username, db_password) + from . import server - db_engine = sqlite_engine() + if "://" in dbname: + db_engine = sqlalchemy_engine() + else: + db_engine = sqlite_engine() s = server.Server(db_engine, upstream=upstream, read_only=read_only) diff --git a/bitbake/lib/hashserv/sqlalchemy.py b/bitbake/lib/hashserv/sqlalchemy.py new file mode 100644 index 00000000000..1b5363c60ba --- /dev/null +++ b/bitbake/lib/hashserv/sqlalchemy.py @@ -0,0 +1,268 @@ +#! /usr/bin/env python3 +# +# Copyright (C) 2023 Garmin Ltd. +# +# SPDX-License-Identifier: GPL-2.0-only +# + +import logging +from datetime import datetime + +from sqlalchemy.ext.asyncio import create_async_engine +from sqlalchemy.pool import NullPool +from sqlalchemy import ( + MetaData, + Column, + Table, + Text, + Integer, + UniqueConstraint, + DateTime, + Index, + select, + insert, + exists, + literal, + and_, + delete, +) +import sqlalchemy.engine +from sqlalchemy.orm import declarative_base +from sqlalchemy.exc import IntegrityError + +logger = logging.getLogger("hashserv.sqlalchemy") + +Base = declarative_base() + + +class UnihashesV2(Base): + __tablename__ = "unihashes_v2" + id = Column(Integer, primary_key=True, autoincrement=True) + method = Column(Text, nullable=False) + taskhash = Column(Text, nullable=False) + unihash = Column(Text, nullable=False) + + __table_args__ = ( + UniqueConstraint("method", "taskhash"), + Index("taskhash_lookup_v3", "method", "taskhash"), + ) + + +class OuthashesV2(Base): + __tablename__ = "outhashes_v2" + id = Column(Integer, primary_key=True, autoincrement=True) + method = Column(Text, nullable=False) + taskhash = Column(Text, nullable=False) + outhash = Column(Text, nullable=False) + created = Column(DateTime) + owner = Column(Text) + PN = Column(Text) + PV = Column(Text) + PR = Column(Text) + task = Column(Text) + outhash_siginfo = Column(Text) + + __table_args__ = ( + UniqueConstraint("method", "taskhash", "outhash"), + Index("outhash_lookup_v3", "method", "outhash"), + ) + + +class DatabaseEngine(object): + def __init__(self, url, username=None, password=None): + self.logger = logger + self.url = sqlalchemy.engine.make_url(url) + + if username is not None: + self.url = self.url.set(username=username) + + if password is not None: + self.url = self.url.set(password=password) + + async def create(self): + self.logger.info("Using database %s", self.url) + self.engine = create_async_engine(self.url, poolclass=NullPool) + + async with self.engine.begin() as conn: + # Create tables + logger.info("Creating tables...") + await conn.run_sync(Base.metadata.create_all) + + def connect(self, logger): + return Database(self.engine, logger) + + +def map_row(row): + if row is None: + return None + return dict(**row._mapping) + + +class Database(object): + def __init__(self, engine, logger): + self.engine = engine + self.db = None + self.logger = logger + + async def __aenter__(self): + self.db = await self.engine.connect() + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + await self.close() + + async def close(self): + await self.db.close() + self.db = None + + async def get_unihash_by_taskhash_full(self, method, taskhash): + statement = ( + select( + OuthashesV2, + UnihashesV2.unihash.label("unihash"), + ) + .join( + UnihashesV2, + and_( + UnihashesV2.method == OuthashesV2.method, + UnihashesV2.taskhash == OuthashesV2.taskhash, + ), + ) + .where( + OuthashesV2.method == method, + OuthashesV2.taskhash == taskhash, + ) + .order_by( + OuthashesV2.created.asc(), + ) + .limit(1) + ) + self.logger.debug("%s", statement) + async with self.db.begin(): + result = await self.db.execute(statement) + return map_row(result.first()) + + async def get_unihash_by_outhash(self, method, outhash): + statement = ( + select(OuthashesV2, UnihashesV2.unihash.label("unihash")) + .join( + UnihashesV2, + and_( + UnihashesV2.method == OuthashesV2.method, + UnihashesV2.taskhash == OuthashesV2.taskhash, + ), + ) + .where( + OuthashesV2.method == method, + OuthashesV2.outhash == outhash, + ) + .order_by( + OuthashesV2.created.asc(), + ) + .limit(1) + ) + self.logger.debug("%s", statement) + async with self.db.begin(): + result = await self.db.execute(statement) + return map_row(result.first()) + + async def get_equivalent_for_outhash(self, method, outhash, taskhash): + statement = ( + select( + OuthashesV2.taskhash.label("taskhash"), + UnihashesV2.unihash.label("unihash"), + ) + .join( + UnihashesV2, + and_( + UnihashesV2.method == OuthashesV2.method, + UnihashesV2.taskhash == OuthashesV2.taskhash, + ), + ) + .where( + OuthashesV2.method == method, + OuthashesV2.outhash == outhash, + OuthashesV2.taskhash != taskhash, + ) + .order_by( + OuthashesV2.created.asc(), + ) + .limit(1) + ) + self.logger.debug("%s", statement) + async with self.db.begin(): + result = await self.db.execute(statement) + return map_row(result.first()) + + async def get_equivalent(self, method, taskhash): + statement = select( + UnihashesV2.unihash, + UnihashesV2.method, + UnihashesV2.taskhash, + ).where( + UnihashesV2.method == method, + UnihashesV2.taskhash == taskhash, + ) + self.logger.debug("%s", statement) + async with self.db.begin(): + result = await self.db.execute(statement) + return map_row(result.first()) + + async def remove(self, condition): + async def do_remove(table): + where = {} + for c in table.__table__.columns: + if c.key in condition and condition[c.key] is not None: + where[c] = condition[c.key] + + if where: + statement = delete(table).where(*[(k == v) for k, v in where.items()]) + self.logger.debug("%s", statement) + async with self.db.begin(): + result = await self.db.execute(statement) + return result.rowcount + + return 0 + + count = 0 + count += await do_remove(UnihashesV2) + count += await do_remove(OuthashesV2) + + return count + + async def insert_unihash(self, method, taskhash, unihash): + statement = insert(UnihashesV2).values( + method=method, + taskhash=taskhash, + unihash=unihash, + ) + self.logger.debug("%s", statement) + try: + async with self.db.begin(): + await self.db.execute(statement) + return True + except IntegrityError: + logger.debug( + "%s, %s, %s already in unihash database", method, taskhash, unihash + ) + return False + + async def insert_outhash(self, data): + outhash_columns = set(c.key for c in OuthashesV2.__table__.columns) + + data = {k: v for k, v in data.items() if k in outhash_columns} + + if "created" in data and not isinstance(data["created"], datetime): + data["created"] = datetime.fromisoformat(data["created"]) + + statement = insert(OuthashesV2).values(**data) + self.logger.debug("%s", statement) + try: + async with self.db.begin(): + await self.db.execute(statement) + return True + except IntegrityError: + logger.debug( + "%s, %s already in outhash database", data["method"], data["outhash"] + ) + return False diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index 82bb005a66d..0afaf9193f4 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py @@ -33,7 +33,7 @@ class HashEquivalenceTestSetup(object): def start_server(self, dbpath=None, upstream=None, read_only=False, prefunc=server_prefunc): self.server_index += 1 if dbpath is None: - dbpath = os.path.join(self.temp_dir.name, "db%d.sqlite" % self.server_index) + dbpath = self.make_dbpath() def cleanup_server(server): if server.process.exitcode is not None: @@ -53,6 +53,9 @@ class HashEquivalenceTestSetup(object): return server + def make_dbpath(self): + return os.path.join(self.temp_dir.name, "db%d.sqlite" % self.server_index) + def start_client(self, server_address): def cleanup_client(client): client.close() @@ -498,6 +501,20 @@ class TestHashEquivalenceWebsocketServer(HashEquivalenceTestSetup, HashEquivalen return "ws://%s:0" % host +class TestHashEquivalenceWebsocketsSQLAlchemyServer(TestHashEquivalenceWebsocketServer): + def setUp(self): + try: + import sqlalchemy + import aiosqlite + except ImportError as e: + self.skipTest(str(e)) + + super().setUp() + + def make_dbpath(self): + return "sqlite+aiosqlite:///%s" % os.path.join(self.temp_dir.name, "db%d.sqlite" % self.server_index) + + class TestHashEquivalenceExternalServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase): def start_test_server(self): if 'BB_TEST_HASHSERV' not in os.environ: From patchwork Tue Oct 3 14:52:48 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Joshua Watt X-Patchwork-Id: 31623 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from aws-us-west-2-korg-lkml-1.web.codeaurora.org (localhost.localdomain [127.0.0.1]) by smtp.lore.kernel.org (Postfix) with ESMTP id 72658E7AD71 for ; Tue, 3 Oct 2023 14:53:12 +0000 (UTC) Received: from mail-oa1-f52.google.com (mail-oa1-f52.google.com [209.85.160.52]) by mx.groups.io with SMTP id smtpd.web10.110654.1696344787522546448 for ; Tue, 03 Oct 2023 07:53:07 -0700 Authentication-Results: mx.groups.io; dkim=pass header.i=@gmail.com header.s=20230601 header.b=j85z2dET; spf=pass (domain: gmail.com, ip: 209.85.160.52, mailfrom: jpewhacker@gmail.com) Received: by mail-oa1-f52.google.com with SMTP id 586e51a60fabf-1dd1714b9b6so672466fac.0 for ; Tue, 03 Oct 2023 07:53:07 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20230601; t=1696344786; x=1696949586; darn=lists.openembedded.org; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=GFwb4QmsIr48JsVIP/uJ3wtNpRoCIU21ziKjfyK/fPs=; b=j85z2dET6gDWLnY4yG8KkBNOVIvCS1Ze5lq+b2r/mUTUkUnrVsAfzRyBTKimEXbFL3 iWOw8lu0/zkymXevAdZIjJC8qfzn96MlD0xp7QC7XcM9Hjb1daop2py33JKv0pszJPbW UK12ukCh3zBF9m25x8TbkCdG0MW4mgdSSEC8uxjbOvayCWcmgczQ3+DLAd5om46r8tIy STbhrb6WVAUptNaKZ7+jyXxdaSpITS9Q8/hkXUF5d3eyBf4sgkDFiTeM3hXWhapH+KTh MkhUjXsRnuNMg2Ei3sLz41G/gmzDH2RW7pzqeC881faxus7r3ysSQ5o2JdFTr/WACAxr vGlQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1696344786; x=1696949586; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-message-state:from:to:cc :subject:date:message-id:reply-to; bh=GFwb4QmsIr48JsVIP/uJ3wtNpRoCIU21ziKjfyK/fPs=; b=PN17mVsniwlVEMGbJIzpoOXlxZbQtN0p/w2sF7XWGXyZD8gcnyTqUI4Xjr6DlSpYNT tj4T4/2cLlOnj7kqsoeOcGR1T87v9D0tSoeia78iss/XiHeiMGIpiLC4OxSuZtJhVhSP O74aESOtjnPwKvywXOWLnziIfy3yN4T2q9txBV95V35Y+cMA2wLw5rS1uel4COrzZy9k zmcJCnqO9UzLZgsUgUdD9DyA8jaKV/MXZEwkV/IQ+Kwtn/DDnzPMTOdwGu/yLVCsULjU codA3xUFiYjyh+GEA2/f/M3ZNdPeSwO/PVhrSF1Nbshl9QUdAMVsCVuXMLX51dPR+78c EHTA== X-Gm-Message-State: AOJu0YyHpiSa6kkT+tEm+NEuWfcSJ6sduPbGj7saJzKhlgD5dMxg9rIN n5eNn+loyImjwgm/vVMa1IdfOpVGtzQ= X-Google-Smtp-Source: AGHT+IGAth2lTFvqbkmyl60Y4io8ifbHEy2Rp+xidbCOCTYZkCqCCqp1JMAFanvrHaUx2l52luixWA== X-Received: by 2002:a05:6870:f6a1:b0:1dc:ddd0:6edc with SMTP id el33-20020a056870f6a100b001dcddd06edcmr16647899oab.1.1696344786288; Tue, 03 Oct 2023 07:53:06 -0700 (PDT) Received: from localhost.localdomain ([2601:282:4300:19e0::589]) by smtp.gmail.com with ESMTPSA id du28-20020a0568703a1c00b001dd2b869a26sm253477oab.17.2023.10.03.07.53.04 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Tue, 03 Oct 2023 07:53:05 -0700 (PDT) From: Joshua Watt X-Google-Original-From: Joshua Watt To: bitbake-devel@lists.openembedded.org Cc: Joshua Watt Subject: [bitbake-devel][RFC v2 11/12] contrib: Update hashserv Dockerfile Date: Tue, 3 Oct 2023 08:52:48 -0600 Message-Id: <20231003145249.1166276-12-JPEWhacker@gmail.com> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20231003145249.1166276-1-JPEWhacker@gmail.com> References: <20230928170551.4193224-1-JPEWhacker@gmail.com> <20231003145249.1166276-1-JPEWhacker@gmail.com> MIME-Version: 1.0 List-Id: X-Webhook-Received: from li982-79.members.linode.com [45.33.32.79] by aws-us-west-2-korg-lkml-1.web.codeaurora.org with HTTPS for ; Tue, 03 Oct 2023 14:53:12 -0000 X-Groupsio-URL: https://lists.openembedded.org/g/bitbake-devel/message/15172 Updates the hash equivalence docker file to the latest Alpine version, and also to install the required python modules into a virtual environment Signed-off-by: Joshua Watt --- bitbake/contrib/hashserv/Dockerfile | 33 ++++++++++++++++------- bitbake/contrib/hashserv/requirements.txt | 5 ++++ 2 files changed, 29 insertions(+), 9 deletions(-) create mode 100644 bitbake/contrib/hashserv/requirements.txt diff --git a/bitbake/contrib/hashserv/Dockerfile b/bitbake/contrib/hashserv/Dockerfile index 74b4a3be1d1..7a9525db8d5 100644 --- a/bitbake/contrib/hashserv/Dockerfile +++ b/bitbake/contrib/hashserv/Dockerfile @@ -9,15 +9,30 @@ # docker build -f contrib/hashserv/Dockerfile . # -FROM alpine:3.13.1 +FROM alpine:3.18.4 -RUN apk add --no-cache python3 +RUN apk add --no-cache \ + python3 \ + py3-virtualenv \ + py3-pip -COPY bin/bitbake-hashserv /opt/bbhashserv/bin/ -COPY lib/hashserv /opt/bbhashserv/lib/hashserv/ -COPY lib/bb /opt/bbhashserv/lib/bb/ -COPY lib/codegen.py /opt/bbhashserv/lib/codegen.py -COPY lib/ply /opt/bbhashserv/lib/ply/ -COPY lib/bs4 /opt/bbhashserv/lib/bs4/ +# Setup virtual environment +RUN mkdir /opt/hashserver +WORKDIR /opt/hashserver -ENTRYPOINT ["/opt/bbhashserv/bin/bitbake-hashserv"] +ENV VIRTUAL_ENV=/opt/hashserver/venv +RUN python3 -m venv $VIRTUAL_ENV +ENV PATH="$VIRTUAL_ENV/bin:$PATH" +RUN pip install -U pip + +COPY contrib/hashserv/requirements.txt . +RUN pip install -r requirements.txt && rm requirements.txt + +COPY bin/bitbake-hashserv ./bin/ +COPY lib/hashserv ./lib/hashserv/ +COPY lib/bb ./lib/bb/ +COPY lib/codegen.py ./lib/ +COPY lib/ply ./lib/ply/ +COPY lib/bs4 ./lib/bs4/ + +ENTRYPOINT ["./bin/bitbake-hashserv"] diff --git a/bitbake/contrib/hashserv/requirements.txt b/bitbake/contrib/hashserv/requirements.txt new file mode 100644 index 00000000000..567a27d4323 --- /dev/null +++ b/bitbake/contrib/hashserv/requirements.txt @@ -0,0 +1,5 @@ +websockets==11.0.3 +sqlalchemy==2.0.21 +sqlalchemy[asyncio]==2.0.21 +aiosqlite==0.19.0 +asyncpg==0.28.0 From patchwork Tue Oct 3 14:52:49 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Joshua Watt X-Patchwork-Id: 31622 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from aws-us-west-2-korg-lkml-1.web.codeaurora.org (localhost.localdomain [127.0.0.1]) by smtp.lore.kernel.org (Postfix) with ESMTP id 71EB6E7AD70 for ; Tue, 3 Oct 2023 14:53:12 +0000 (UTC) Received: from mail-oa1-f51.google.com (mail-oa1-f51.google.com [209.85.160.51]) by mx.groups.io with SMTP id smtpd.web11.110216.1696344788775610898 for ; Tue, 03 Oct 2023 07:53:08 -0700 Authentication-Results: mx.groups.io; dkim=pass header.i=@gmail.com header.s=20230601 header.b=fCMgSzrg; spf=pass (domain: gmail.com, ip: 209.85.160.51, mailfrom: jpewhacker@gmail.com) Received: by mail-oa1-f51.google.com with SMTP id 586e51a60fabf-1e10507a4d6so687807fac.1 for ; Tue, 03 Oct 2023 07:53:08 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20230601; t=1696344787; x=1696949587; darn=lists.openembedded.org; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=XKQcDC2aqTwuAyVrpsIUcbCPaQoypuxcDL8hUPW5OvQ=; b=fCMgSzrg/jDZYaNOnDnVf2Rh/W96IG4qsTVErAd5S9Fi2x3FltUSlpSNXMzimOi8gh FlvYnGRlavSxKlJWzZO5OaeJQZmocARxQK6lCpIWZ8CIIQw+11XLQRzV5dTjtRyHRTZk eOsqeveoeJNHCM+h0ce3CJ/MfgriRdFxWbdQcM5GlxNxmP5f6aVkfkL3iq+7QgyKEM58 5IQePst1TAWWSNVJHN4ZOD2XvYCPG2jA4e5V67zeLU8ThHuvQktBGHbkPDHZ7fVmlcao 0j7xjePsEkn04q3+D3DUPxtyDcIcgmG8+ZN23lULXGNBHJQfrHJejJBbAuY5jqPLTNDr kdFg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1696344787; x=1696949587; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-message-state:from:to:cc :subject:date:message-id:reply-to; bh=XKQcDC2aqTwuAyVrpsIUcbCPaQoypuxcDL8hUPW5OvQ=; b=CvemF5kxq7f08kxpWlt/xNxNVrQb46bkDx629+RcAFKNurPcb/951f0REku8WGbXC8 Y8nnU/EkG56nvapKTUkdfW8WtPjqaoXNZd2v6hDK69F/ner9OyeqeYmECdv2hmTZlZiv hCnGxrMaEi/14RnPxTJHsdXTMJUMcmGVaLm6au2nxdmeWLFLTTF7SDQf/rc1Ur6wb8OX Yu5U/hazTX4kMD05bfKAHwWDJ2LYxRCBoGRTkRRLFV46dGF7eiiKHNQXG8WE8zOEVEOg fbqIoroVd703ra4Wd0dnswdAYfwKDbDLxbnyoeBMEIDLPHkVydgyj6ms9niwqo2hinfX hr+Q== X-Gm-Message-State: AOJu0YwvTHFYVDFecPHa6t1nmaBChAqzGt00DzyA6uYGqDeWN78hjQYI UBEoYOBemwaMc6V6JkVR9GM0XJJDXYs= X-Google-Smtp-Source: AGHT+IEyQrA1wW7d5FIXMzTPvaVP5mrewiHg2EiZGozRONil67ImcSGVw2Wp33R99LpsNWOFoDAUgQ== X-Received: by 2002:a05:6870:c588:b0:1d5:a17e:f62 with SMTP id ba8-20020a056870c58800b001d5a17e0f62mr17826036oab.24.1696344787486; Tue, 03 Oct 2023 07:53:07 -0700 (PDT) Received: from localhost.localdomain ([2601:282:4300:19e0::589]) by smtp.gmail.com with ESMTPSA id du28-20020a0568703a1c00b001dd2b869a26sm253477oab.17.2023.10.03.07.53.06 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Tue, 03 Oct 2023 07:53:06 -0700 (PDT) From: Joshua Watt X-Google-Original-From: Joshua Watt To: bitbake-devel@lists.openembedded.org Cc: Joshua Watt Subject: [bitbake-devel][RFC v2 12/12] contrib: hashserv: Add docker-compose Date: Tue, 3 Oct 2023 08:52:49 -0600 Message-Id: <20231003145249.1166276-13-JPEWhacker@gmail.com> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20231003145249.1166276-1-JPEWhacker@gmail.com> References: <20230928170551.4193224-1-JPEWhacker@gmail.com> <20231003145249.1166276-1-JPEWhacker@gmail.com> MIME-Version: 1.0 List-Id: X-Webhook-Received: from li982-79.members.linode.com [45.33.32.79] by aws-us-west-2-korg-lkml-1.web.codeaurora.org with HTTPS for ; Tue, 03 Oct 2023 14:53:12 -0000 X-Groupsio-URL: https://lists.openembedded.org/g/bitbake-devel/message/15173 Adds a docker-compose.yaml file and docker-compose script wrapper to run it with the correct arguments. This can be used to easily standup a hash equivalence server with a postgresql database for demonstration purposes. Signed-off-by: Joshua Watt --- bitbake/contrib/hashserv/docker-compose | 11 ++++++ bitbake/contrib/hashserv/docker-compose.yaml | 36 ++++++++++++++++++++ 2 files changed, 47 insertions(+) create mode 100755 bitbake/contrib/hashserv/docker-compose create mode 100644 bitbake/contrib/hashserv/docker-compose.yaml diff --git a/bitbake/contrib/hashserv/docker-compose b/bitbake/contrib/hashserv/docker-compose new file mode 100755 index 00000000000..74cbf4807d0 --- /dev/null +++ b/bitbake/contrib/hashserv/docker-compose @@ -0,0 +1,11 @@ +#! /bin/sh +# Copyright (C) 2023 Garmin Ltd. +# +# SPDX-License-Identifier: GPL-2.0-only +# +# Wrapper script to correctly run docker-compose with the correct argument for +# the hash equivalence server + +cd "$(dirname "$0")/../../" + +exec docker-compose -f contrib/hashserv/docker-compose.yaml --project-directory . "$@" diff --git a/bitbake/contrib/hashserv/docker-compose.yaml b/bitbake/contrib/hashserv/docker-compose.yaml new file mode 100644 index 00000000000..c1ab73c12e8 --- /dev/null +++ b/bitbake/contrib/hashserv/docker-compose.yaml @@ -0,0 +1,36 @@ +# Copyright (C) 2023 Garmin Ltd. +# +# SPDX-License-Identifier: GPL-2.0-only + +--- +version: "3.7" +services: + postgresql: + image: postgres:16.0 + restart: always + environment: + - POSTGRES_DB=hashserver + - POSTGRES_USER=postgres + - POSTGRES_PASSWORD=postgres + volumes: + - postgres-data:/var/lib/postgresql/data + + hashserver: + build: + context: . + dockerfile: ./contrib/hashserv/Dockerfile + image: bitbake-hashserver:latest + restart: always + ports: + - "9000:9000" + environment: + - HASHSERVER_LOG_LEVEL=INFO + - "HASHSERVER_BIND=ws://0.0.0.0:9000" + - "HASHSERVER_DB=postgresql+asyncpg://postgresql:5432/hashserver" + - HASHSERVER_DB_USERNAME=postgres + - HASHSERVER_DB_PASSWORD=postgres + depends_on: + - postgresql + +volumes: + postgres-data: