diff mbox series

[bitbake-devel,RFC,5/5] hashserv: Add websocket connection implementation

Message ID 20230928170551.4193224-6-JPEWhacker@gmail.com
State New
Headers show
Series Bitbake Hash Server WebSockets Implementation | expand

Commit Message

Joshua Watt Sept. 28, 2023, 5:05 p.m. UTC
Adds support for the hash equivalence client to talk to a server over a
websocket. Since websocket are already message orientated (as opposed to
stream orientated like the other implementations), a new MessageParser()
class is implemented for them (although, this class would work for any
message orientation socket, not just websockets).

Note that websocket support does require the 3rd party websockets python
module be installed on the host, but it should not be required unless
websockets are actually being used.

Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
---
 bitbake/lib/bb/asyncrpc/client.py | 38 +++++++++++++++++++++++++++++++
 bitbake/lib/hashserv/__init__.py  |  9 ++++++++
 bitbake/lib/hashserv/client.py    |  1 +
 3 files changed, 48 insertions(+)
diff mbox series

Patch

diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py
index 335da09d8c6..b0892a82436 100644
--- a/bitbake/lib/bb/asyncrpc/client.py
+++ b/bitbake/lib/bb/asyncrpc/client.py
@@ -78,6 +78,35 @@  class StreamParser(object):
             self.writer = None
 
 
+class MessageParser(object):
+    def __init__(self, socket, timeout):
+        self.socket = socket
+        self.timeout = timeout
+
+    async def setup_connection(self, proto_name, proto_version):
+        await self.socket.send("%s %s" % (proto_name, proto_version))
+
+    async def invoke(self, msg):
+        await self.socket.send(json.dumps(msg))
+        try:
+            message = await asyncio.wait_for(self.socket.recv(), self.timeout)
+        except asyncio.TimeoutError:
+            raise ConnectionError("Timed out waiting for server")
+
+        return json.loads(message)
+
+    async def close(self):
+        if self.socket is not None:
+            await self.socket.close()
+            self.socket = None
+
+    async def send(self, msg):
+        await self.socket.send(msg)
+
+    async def recv(self):
+        return await self.socket.recv()
+
+
 class AsyncClient(object):
     def __init__(self, proto_name, proto_version, logger, timeout=30):
         self.socket = None
@@ -111,6 +140,15 @@  class AsyncClient(object):
 
         self._connect_sock = connect_sock
 
+    async def connect_websocket(self, uri):
+        import websockets
+
+        async def connect_sock():
+            websocket = await websockets.connect(uri)
+            return MessageParser(websocket, self.timeout)
+
+        self._connect_sock = connect_sock
+
     async def setup_connection(self):
         await self.socket.setup_connection(self.proto_name, self.proto_version)
 
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py
index 9cb3fd57a51..aa5c7e736d7 100644
--- a/bitbake/lib/hashserv/__init__.py
+++ b/bitbake/lib/hashserv/__init__.py
@@ -11,9 +11,12 @@  import itertools
 import json
 
 UNIX_PREFIX = "unix://"
+WS_PREFIX = "ws://"
+WSS_PREFIX = "wss://"
 
 ADDR_TYPE_UNIX = 0
 ADDR_TYPE_TCP = 1
+ADDR_TYPE_WS = 2
 
 # The Python async server defaults to a 64K receive buffer, so we hardcode our
 # maximum chunk size. It would be better if the client and server reported to
@@ -91,6 +94,8 @@  def setup_database(database, sync=True):
 def parse_address(addr):
     if addr.startswith(UNIX_PREFIX):
         return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX):],))
+    elif addr.startswith(WS_PREFIX) or addr.startswith(WSS_PREFIX):
+        return (ADDR_TYPE_WS, (addr,))
     else:
         m = re.match(r'\[(?P<host>[^\]]*)\]:(?P<port>\d+)$', addr)
         if m is not None:
@@ -137,6 +142,8 @@  def create_client(addr):
     (typ, a) = parse_address(addr)
     if typ == ADDR_TYPE_UNIX:
         c.connect_unix(*a)
+    elif typ == ADDR_TYPE_WS:
+        c.connect_websocket(*a)
     else:
         c.connect_tcp(*a)
 
@@ -149,6 +156,8 @@  async def create_async_client(addr):
     (typ, a) = parse_address(addr)
     if typ == ADDR_TYPE_UNIX:
         await c.connect_unix(*a)
+    elif typ == ADDR_TYPE_WS:
+        await c.connect_websocket(*a)
     else:
         await c.connect_tcp(*a)
 
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py
index 7d2b9cb394f..c80aa32fa55 100644
--- a/bitbake/lib/hashserv/client.py
+++ b/bitbake/lib/hashserv/client.py
@@ -107,6 +107,7 @@  class Client(bb.asyncrpc.Client):
         super().__init__()
         self._add_methods(
             "connect_tcp",
+            "connect_websocket",
             "get_unihash",
             "report_unihash",
             "report_unihash_equiv",