diff mbox series

bitbake: how to create a client to an asyncrpc server from another server?

Message ID f1ddb6b8-fd5f-435f-8b20-ef60f62066db@bootlin.com
State New
Headers show
Series bitbake: how to create a client to an asyncrpc server from another server? | expand

Commit Message

Michael Opdenacker March 4, 2024, 5:53 p.m. UTC
Joshua, all,

I'm trying to connect as a client to another PR server from within a PR 
server instance...

My actual code is more complicated, but I have an issue that results 
from just adding this code to the start() function of the PRServer class 
in bitbake/lib/prserv/serv.py:

      async def stop(self):

Comments

Joshua Watt March 4, 2024, 7:27 p.m. UTC | #1
Michael,

Create an async client instead of a blocking client:

  conn = client.PRAsyncClient()

You'll want to do this anyway because you want to make async calls to
the "upstream" server. If you use a blocking client it will block the
entire server process during the call which means the server won't be
able to process any other clients while waiting for a response.

On Mon, Mar 4, 2024 at 10:53 AM Michael Opdenacker
<michael.opdenacker@bootlin.com> wrote:
>
> Joshua, all,
>
> I'm trying to connect as a client to another PR server from within a PR
> server instance...
>
> My actual code is more complicated, but I have an issue that results
> from just adding this code to the start() function of the PRServer class
> in bitbake/lib/prserv/serv.py:
>
> diff --git a/bitbake/lib/prserv/serv.py b/bitbake/lib/prserv/serv.py
> index 62d3b5a01c..66aebacb2e 100644
> --- a/bitbake/lib/prserv/serv.py
> +++ b/bitbake/lib/prserv/serv.py
> @@ -108,6 +108,8 @@ class PRServer(bb.asyncrpc.AsyncServer):
>           logger.info("Started PRServer with DBfile: %s, Address: %s,
> PID: %s" %
>                        (self.dbfile, self.address, str(os.getpid())))
>
> +        from . import client
> +        conn = client.PRClient()
>           return tasks
>
>       async def stop(self):
> --
> 2.34.1
>
> That's what the ping() function in bitbake/lib/prserv/serv.py does by
> the way.
>
> This breaks BitBake "big time", as shown in the bitbake-cookerdaemon.log
> output:
>
> 168775 18:34:06.430012 Base config valid
> Process Process-1:
> Traceback (most recent call last):
>    File "/usr/lib/python3.10/multiprocessing/process.py", line 314, in
> _bootstrap
>      self.run()
>    File "/usr/lib/python3.10/multiprocessing/process.py", line 108, in run
>      self._target(*self._args, **self._kwargs)
>    File "/home/mike/work/yocto/poky/bitbake/lib/bb/asyncrpc/serv.py",
> line 369, in run
>      self._serve_forever(tasks)
>    File "/home/mike/work/yocto/poky/bitbake/lib/bb/asyncrpc/serv.py",
> line 314, in _serve_forever
>      self.loop.run_until_complete(asyncio.gather(*tasks))
>    File "/usr/lib/python3.10/asyncio/base_events.py", line 628, in
> run_until_complete
>      future = tasks.ensure_future(future, loop=self)
>    File "/usr/lib/python3.10/asyncio/tasks.py", line 615, in ensure_future
>      return _ensure_future(coro_or_future, loop=loop)
>    File "/usr/lib/python3.10/asyncio/tasks.py", line 621, in _ensure_future
>      raise ValueError('The future belongs to a different loop than '
> ValueError: The future belongs to a different loop than the one
> specified as the loop argument
> 168775 18:34:06.439408 Sending reply (None,
> 'bb.BBHandledException()\nTraceback (most recent call last):\n File
> "/home/mike/work/yocto/poky/bitbake/lib/prserv/serv.py", line 333, in
> auto_start\n    ping(host, port)\n  File
> "/home/mike/work/yocto/poky/bitbake/lib/prserv/serv.py", line 352, in
> ping\n    return conn.ping()\n  File
> "/home/mike/work/yocto/poky/bitbake/lib/bb/asyncrpc/client.py", line
> 178, in wrapper\n    return self.loop.run_until_complete(downcall(*args,
> **kwargs))\n  File "/usr/lib/python3.10/asyncio/base_events.py", line
> 649, in run_until_complete\n    return future.result()\n  File
> "/home/mike/work/yocto/poky/bitbake/lib/bb/asyncrpc/client.py", line
> 146, in ping\n    return await self.invoke({"ping": {}})\n File
> "/home/mike/work/yocto/poky/bitbake/lib/bb/asyncrpc/client.py", line
> 141, in invoke\n    result = await self._send_wrapper(proc)\n  File
> "/home/mike/work/yocto/poky/bitbake/lib/bb/asyncrpc/client.py", line
> 128, in _send_wrapper\n    raise e\n  File
> "/home/mike/work/yocto/poky/bitbake/lib/bb/asyncrpc/client.py", line
> 115, in _send_wrapper\n    await self.connect()\n  File
> "/home/mike/work/yocto/poky/bitbake/lib/bb/asyncrpc/client.py", line
> 100, in connect\n    self.socket = await self._connect_sock()\n  File
> "/home/mike/work/yocto/poky/bitbake/lib/bb/asyncrpc/client.py", line 41,
> in connect_sock\n    reader, writer = await
> asyncio.open_connection(address, port)\n  File
> "/usr/lib/python3.10/asyncio/streams.py", line 48, in
> open_connection\n    transport, _ = await loop.create_connection(\n
> File "/usr/lib/python3.10/asyncio/base_events.py", line 1076, in
> create_connection\n    raise exceptions[0]\n  File
> "/usr/lib/python3.10/asyncio/base_events.py", line 1060, in
> create_connection\n    sock = await self._connect_sock(\n  File
> "/usr/lib/python3.10/asyncio/base_events.py", line 969, in
> _connect_sock\n    await self.sock_connect(sock, address)\n  File
> "/usr/lib/python3.10/asyncio/selector_events.py", line 501, in
> sock_connect\n    return await fut\n  File
> "/usr/lib/python3.10/asyncio/selector_events.py", line 541, in
> _sock_connect_cb\n    raise OSError(err, f\'Connect call failed
> {address}\')\nConnectionRefusedError: [Errno 111] Connect call failed
> (\'127.0.0.1\', 37321)\n\nDuring handling of the above exception,
> another exception occurred:\n\nTraceback (most recent call last):\n
> File "/home/mike/work/yocto/poky/bitbake/lib/bb/cooker.py", line 308, in
> handlePRServ\n    self.prhost = prserv.serv.auto_start(self.data)\n
> File "/home/mike/work/yocto/poky/bitbake/lib/prserv/serv.py", line 338,
> in auto_start\n    raise
> PRServiceConfigError\nprserv.serv.PRServiceConfigError\n\nDuring
> handling of the above exception, another exception
> occurred:\n\nTraceback (most recent call last):\n  File
> "/home/mike/work/yocto/poky/bitbake/lib/bb/command.py", line 90, in
> runCommand\n    result = command_method(self, commandline)\n File
> "/home/mike/work/yocto/poky/bitbake/lib/bb/command.py", line 290, in
> updateConfig\n    command.cooker.updateConfigOpts(options, environment,
> cmdline)\n  File "/home/mike/work/yocto/poky/bitbake/lib/bb/cooker.py",
> line 462, in updateConfigOpts\n    self.reset()\n  File
> "/home/mike/work/yocto/poky/bitbake/lib/bb/cooker.py", line 1737, in
> reset\n    self.handlePRServ()\n  File
> "/home/mike/work/yocto/poky/bitbake/lib/bb/cooker.py", line 310, in
> handlePRServ\n    bb.fatal("Unable to start PR Server, exiting, check
> the bitbake-cookerdaemon.log")\n  File
> "/home/mike/work/yocto/poky/bitbake/lib/bb/__init__.py", line 189, in
> fatal\n    raise BBHandledException()\nbb.BBHandledException\n')
> 168775 18:34:06.439442 Command Completed (socket: True)
> 168775 18:34:06.689943 Processing Client
> 168775 18:34:06.690247 Disconnecting Client (socket: True)
> 168775 18:34:06.691510 No timeout, exiting.
> 168775 18:34:06.792083 Exiting (socket: True)
> 168775 18:34:06.795611 Exiting as we could obtain the lock
> sys:1: ResourceWarning: unclosed file <_io.TextIOWrapper
> name='/home/mike/work/yocto/poky/build/bitbake-cookerdaemon.log'
> mode='a+' encoding='UTF-8'>
>
> I guess the new event loop created in the
> bitbake/lib/bb/asyncrpc/client.py causes the kind of issue reported on
> https://git.yoctoproject.org/poky/tree/bitbake/lib/bb/asyncrpc/client.py#n160.
>
> So, would you have some guidance on how to implement a connection to
> another server from inside a server?
> Thanks in advance
> Cheers
> Michael.
>
> --
> Michael Opdenacker, Bootlin
> Embedded Linux and Kernel engineering
> https://bootlin.com
>
Michael Opdenacker March 5, 2024, 2:09 p.m. UTC | #2
Hi Joshua

On 3/4/24 at 20:27, Joshua Watt wrote:
> Michael,
>
> Create an async client instead of a blocking client:
>
>    conn = client.PRAsyncClient()
>
> You'll want to do this anyway because you want to make async calls to
> the "upstream" server. If you use a blocking client it will block the
> entire server process during the call which means the server won't be
> able to process any other clients while waiting for a response.

That was exactly what I needed, thanks! Now, the "upstream" server is 
receiving the connections from my local server. The received requests 
are still invalid (investigating) but this is already a nice leap forward.
Many thanks again
Cheers
Michael.
Michael Opdenacker March 6, 2024, 11:28 a.m. UTC | #3
Hi Joshua

On 3/5/24 at 15:09, Michael Opdenacker via lists.openembedded.org wrote:
> Hi Joshua
>
> On 3/4/24 at 20:27, Joshua Watt wrote:
>> Michael,
>>
>> Create an async client instead of a blocking client:
>>
>>    conn = client.PRAsyncClient()
>>
>> You'll want to do this anyway because you want to make async calls to
>> the "upstream" server. If you use a blocking client it will block the
>> entire server process during the call which means the server won't be
>> able to process any other clients while waiting for a response.
>
> That was exactly what I needed, thanks! Now, the "upstream" server is 
> receiving the connections from my local server. The received requests 
> are still invalid (investigating) but this is already a nice leap 
> forward.


Actually, I'm struggling to reproduce this, as the upstream server 
doesn't seem to receive connections any more.

Anyway, I'm not sure what you're suggesting is what I want. The 
connection to the upstream server is attempted from the start() function 
of the PRServer class, which is synchronous, so a synchronous connection 
to the upstream server should be fine too, right?

However, this makes we wonder whether the local server start is the best 
place to establish the connection to the upstream server. It may be 
better to attempt to establish the connection for each client, as the 
upstream server could be temporarily unavailable. This way, some clients 
will get a failure, but later other clients will succeed, when the 
upstream server is back. This would work without having to restart the 
local server.

This looks like a better solution, right?
Cheers
Michael.
Joshua Watt March 6, 2024, 3:23 p.m. UTC | #4
On Wed, Mar 6, 2024 at 4:28 AM Michael Opdenacker via
lists.openembedded.org
<michael.opdenacker=bootlin.com@lists.openembedded.org> wrote:
>
> Hi Joshua
>
> On 3/5/24 at 15:09, Michael Opdenacker via lists.openembedded.org wrote:
> > Hi Joshua
> >
> > On 3/4/24 at 20:27, Joshua Watt wrote:
> >> Michael,
> >>
> >> Create an async client instead of a blocking client:
> >>
> >>    conn = client.PRAsyncClient()
> >>
> >> You'll want to do this anyway because you want to make async calls to
> >> the "upstream" server. If you use a blocking client it will block the
> >> entire server process during the call which means the server won't be
> >> able to process any other clients while waiting for a response.
> >
> > That was exactly what I needed, thanks! Now, the "upstream" server is
> > receiving the connections from my local server. The received requests
> > are still invalid (investigating) but this is already a nice leap
> > forward.
>
>
> Actually, I'm struggling to reproduce this, as the upstream server
> doesn't seem to receive connections any more.
>
> Anyway, I'm not sure what you're suggesting is what I want. The
> connection to the upstream server is attempted from the start() function
> of the PRServer class, which is synchronous, so a synchronous connection
> to the upstream server should be fine too, right?

The PRServer and clients are only implemented asynchronously; in both
cases the "synchronous" version is just a wrapper around the async
version that makes a loop to handle requests for your (and in the case
of PRServer, I think it also runs that loop on a thread in the
background). In the case of using a client from the inside the server,
you won't be able to use the "blocking" start() function to create the
client, because any blocking calls run on the server will block the
entire server and all clients it's currently processing (e.g. the
server has a single thread and uses cooperative multitasking to
service the clients, so it should never make truly blocking calls as
they block everyone). Instead, in the server you'll need to create an
async client and the manually connect it to the server using the
correct connection function, e.g.:

  client = PRAsyncClient()
  await client.connect_websocket("http://my-upstream-server/websocket")

See the AsyncClient class in bitbake for all the connect functions; it
might be worth moving the address string parsing logic in
bitbake/lib/hashserv/__init__.py in that class so it can be shared by
all the async clients.

>
> However, this makes we wonder whether the local server start is the best
> place to establish the connection to the upstream server. It may be
> better to attempt to establish the connection for each client, as the
> upstream server could be temporarily unavailable. This way, some clients
> will get a failure, but later other clients will succeed, when the
> upstream server is back. This would work without having to restart the
> local server.

Yes. This is what the hash equivalent server does also. You'll
actually have to do it this way because the PRAsyncClient() (or any
AsyncClient for that matter) is not designed to have concurrent or
interleaved requests. In the PRServer, each connected client gets its
own "thread" (although remember it's cooperative multitasking, so
there aren't actually threads). If you tried to share a single client
connection between all the client "threads", it could end up
interleaving requests to the server on a single client, which will not
work. Instead, create a PRAsyncClient() for each client connected to
the server. In the hash equivalence server this is done in
hashsever.client.ServerClient::process_requests(). I'd recommend doing
similar in the PRServerClient class

FWIW, process_requests() is basically the "thread" entry point for the
client. In the AsyncClient class, it goes into a loop reading requests
and sending responses until the connection closes, so you override
this in your class to make the upstream client, then call
super().process_client() to go into the normal request handling (as in
the HE example).

>
> This looks like a better solution, right?
> Cheers
> Michael.
>
> --
> Michael Opdenacker, Bootlin
> Embedded Linux and Kernel engineering
> https://bootlin.com
>
>
> -=-=-=-=-=-=-=-=-=-=-=-
> Links: You receive all messages sent to this group.
> View/Reply Online (#15988): https://lists.openembedded.org/g/bitbake-devel/message/15988
> Mute This Topic: https://lists.openembedded.org/mt/104727453/3616693
> Group Owner: bitbake-devel+owner@lists.openembedded.org
> Unsubscribe: https://lists.openembedded.org/g/bitbake-devel/unsub [JPEWhacker@gmail.com]
> -=-=-=-=-=-=-=-=-=-=-=-
>
diff mbox series

Patch

diff --git a/bitbake/lib/prserv/serv.py b/bitbake/lib/prserv/serv.py
index 62d3b5a01c..66aebacb2e 100644
--- a/bitbake/lib/prserv/serv.py
+++ b/bitbake/lib/prserv/serv.py
@@ -108,6 +108,8 @@  class PRServer(bb.asyncrpc.AsyncServer):
          logger.info("Started PRServer with DBfile: %s, Address: %s, 
PID: %s" %
                       (self.dbfile, self.address, str(os.getpid())))

+        from . import client
+        conn = client.PRClient()
          return tasks