@@ -78,6 +78,35 @@ class StreamParser(object):
self.writer = None
+class MessageParser(object):
+ def __init__(self, socket, timeout):
+ self.socket = socket
+ self.timeout = timeout
+
+ async def setup_connection(self, proto_name, proto_version):
+ await self.socket.send("%s %s" % (proto_name, proto_version))
+
+ async def invoke(self, msg):
+ await self.socket.send(json.dumps(msg))
+ try:
+ message = await asyncio.wait_for(self.socket.recv(), self.timeout)
+ except asyncio.TimeoutError:
+ raise ConnectionError("Timed out waiting for server")
+
+ return json.loads(message)
+
+ async def close(self):
+ if self.socket is not None:
+ await self.socket.close()
+ self.socket = None
+
+ async def send(self, msg):
+ await self.socket.send(msg)
+
+ async def recv(self):
+ return await self.socket.recv()
+
+
class AsyncClient(object):
def __init__(self, proto_name, proto_version, logger, timeout=30):
self.socket = None
@@ -111,6 +140,15 @@ class AsyncClient(object):
self._connect_sock = connect_sock
+ async def connect_websocket(self, uri):
+ import websockets
+
+ async def connect_sock():
+ websocket = await websockets.connect(uri)
+ return MessageParser(websocket, self.timeout)
+
+ self._connect_sock = connect_sock
+
async def setup_connection(self):
await self.socket.setup_connection(self.proto_name, self.proto_version)
@@ -11,9 +11,12 @@ import itertools
import json
UNIX_PREFIX = "unix://"
+WS_PREFIX = "ws://"
+WSS_PREFIX = "wss://"
ADDR_TYPE_UNIX = 0
ADDR_TYPE_TCP = 1
+ADDR_TYPE_WS = 2
# The Python async server defaults to a 64K receive buffer, so we hardcode our
# maximum chunk size. It would be better if the client and server reported to
@@ -91,6 +94,8 @@ def setup_database(database, sync=True):
def parse_address(addr):
if addr.startswith(UNIX_PREFIX):
return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX):],))
+ elif addr.startswith(WS_PREFIX) or addr.startswith(WSS_PREFIX):
+ return (ADDR_TYPE_WS, (addr,))
else:
m = re.match(r'\[(?P<host>[^\]]*)\]:(?P<port>\d+)$', addr)
if m is not None:
@@ -137,6 +142,8 @@ def create_client(addr):
(typ, a) = parse_address(addr)
if typ == ADDR_TYPE_UNIX:
c.connect_unix(*a)
+ elif typ == ADDR_TYPE_WS:
+ c.connect_websocket(*a)
else:
c.connect_tcp(*a)
@@ -149,6 +156,8 @@ async def create_async_client(addr):
(typ, a) = parse_address(addr)
if typ == ADDR_TYPE_UNIX:
await c.connect_unix(*a)
+ elif typ == ADDR_TYPE_WS:
+ await c.connect_websocket(*a)
else:
await c.connect_tcp(*a)
@@ -107,6 +107,7 @@ class Client(bb.asyncrpc.Client):
super().__init__()
self._add_methods(
"connect_tcp",
+ "connect_websocket",
"get_unihash",
"report_unihash",
"report_unihash_equiv",
Adds support for the hash equivalence client to talk to a server over a websocket. Since websocket are already message orientated (as opposed to stream orientated like the other implementations), a new MessageParser() class is implemented for them (although, this class would work for any message orientation socket, not just websockets). Note that websocket support does require the 3rd party websockets python module be installed on the host, but it should not be required unless websockets are actually being used. Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> --- bitbake/lib/bb/asyncrpc/client.py | 38 +++++++++++++++++++++++++++++++ bitbake/lib/hashserv/__init__.py | 9 ++++++++ bitbake/lib/hashserv/client.py | 1 + 3 files changed, 48 insertions(+)