diff mbox series

[RFC,v2] qemurunner.py: continue to drain stdout after login:

Message ID 20231211155729.106010-1-alex.bennee@linaro.org
State Under Review
Headers show
Series [RFC,v2] qemurunner.py: continue to drain stdout after login: | expand

Commit Message

Alex Bennée Dec. 11, 2023, 3:57 p.m. UTC
If qemurunner doesn't continuously drain stdout we will eventually
cause QEMU to block while trying to write to the pipe. This can
manifest itself if the guest has for example configured its serial
ports to output via stdio even if the test itself is using a TCP
console or SSH to run things.

To do this:

  - always create a logging thread regardless of serial_ports
  - use a semaphore between main and logging threads
  - move the login detection into the logging thread
  - wait until the second acquire before continuing

This doesn't address a potential overflow of stderr although generally
stderr from QEMU will be a lot less likely to block due to the volume
of data.

Signed-off-by: Alex Bennée <alex.bennee@linaro.org>
Cc: Mikko Rapeli <mikko.rapeli@linaro.org>
---
 meta/lib/oeqa/utils/qemurunner.py | 128 ++++++++++++++++++------------
 1 file changed, 78 insertions(+), 50 deletions(-)

Comments

Khem Raj Dec. 12, 2023, 6:26 p.m. UTC | #1
this version hangs the run forever.

On Mon, Dec 11, 2023 at 7:57 AM Alex Bennée <alex.bennee@linaro.org> wrote:
>
> If qemurunner doesn't continuously drain stdout we will eventually
> cause QEMU to block while trying to write to the pipe. This can
> manifest itself if the guest has for example configured its serial
> ports to output via stdio even if the test itself is using a TCP
> console or SSH to run things.
>
> To do this:
>
>   - always create a logging thread regardless of serial_ports
>   - use a semaphore between main and logging threads
>   - move the login detection into the logging thread
>   - wait until the second acquire before continuing
>
> This doesn't address a potential overflow of stderr although generally
> stderr from QEMU will be a lot less likely to block due to the volume
> of data.
>
> Signed-off-by: Alex Bennée <alex.bennee@linaro.org>
> Cc: Mikko Rapeli <mikko.rapeli@linaro.org>
> ---
>  meta/lib/oeqa/utils/qemurunner.py | 128 ++++++++++++++++++------------
>  1 file changed, 78 insertions(+), 50 deletions(-)
>
> diff --git a/meta/lib/oeqa/utils/qemurunner.py b/meta/lib/oeqa/utils/qemurunner.py
> index 29fe271976..b768c08f04 100644
> --- a/meta/lib/oeqa/utils/qemurunner.py
> +++ b/meta/lib/oeqa/utils/qemurunner.py
> @@ -207,8 +207,7 @@ class QemuRunner:
>          self.logger.info("QMP Available for connection at %s" % (qmp_port2))
>
>          try:
> -            if self.serial_ports >= 2:
> -                self.threadsock, threadport = self.create_socket()
> +            self.threadsock, threadport = self.create_socket()
>              self.server_socket, self.serverport = self.create_socket()
>          except socket.error as msg:
>              self.logger.error("Failed to create listening socket: %s" % msg[1])
> @@ -243,6 +242,7 @@ class QemuRunner:
>          # to be a proper fix but this will suffice for now.
>          self.runqemu = subprocess.Popen(launch_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, preexec_fn=os.setpgrp, env=env, cwd=self.tmpdir)
>          output = self.runqemu.stdout
> +        output_drain = output
>          launch_time = time.time()
>
>          #
> @@ -431,21 +431,30 @@ class QemuRunner:
>          self.logger.debug("Target IP: %s" % self.ip)
>          self.logger.debug("Server IP: %s" % self.server_ip)
>
> -        if self.serial_ports >= 2:
> -            self.thread = LoggingThread(self.log, self.threadsock, self.logger)
> -            self.thread.start()
> -            if not self.thread.connection_established.wait(self.boottime):
> -                self.logger.error("Didn't receive a console connection from qemu. "
> -                             "Here is the qemu command line used:\n%s\nand "
> -                             "output from runqemu:\n%s" % (cmdline, out))
> -                self.stop_thread()
> -                return False
> +        # Create and hold onto the login semaphore, this will block
> +        # the LoggingThread until we are ready
> +        login_semaphore = threading.Semaphore()
> +        login_semaphore.acquire()
> +
> +        self.thread = LoggingThread(self.log, self.threadsock,
> +                                    self.runqemu.stdout, self.boot_patterns['search_reached_prompt'],
> +                                    self.logger, login_semaphore)
> +
> +        self.thread.start()
> +        login_semaphore.release()
> +
> +        # if not self.thread.connection_established.wait(self.boottime):
> +        #     self.logger.error("Didn't receive a console connection from qemu. "
> +        #                       "Here is the qemu command line used:\n%s\nand "
> +        #                       "output from runqemu:\n%s" % (cmdline, out))
> +        #     self.stop_thread()
> +        #     return False
>
>          self.logger.debug("Output from runqemu:\n%s", out)
>          self.logger.debug("Waiting at most %d seconds for login banner (%s)" %
>                            (self.boottime, time.strftime("%D %H:%M:%S")))
>          endtime = time.time() + self.boottime
> -        filelist = [self.server_socket, self.runqemu.stdout]
> +        filelist = [self.server_socket]
>          reachedlogin = False
>          stopread = False
>          qemusock = None
> @@ -464,46 +473,19 @@ class QemuRunner:
>                      filelist.remove(self.server_socket)
>                      self.logger.debug("Connection from %s:%s" % addr)
>                  else:
> -                    # try to avoid reading only a single character at a time
> -                    time.sleep(0.1)
> -                    if hasattr(file, 'read'):
> -                        read = file.read(1024)
> -                    elif hasattr(file, 'recv'):
> -                        read = file.recv(1024)
> -                    else:
> -                        self.logger.error('Invalid file type: %s\n%s' % (file))
> -                        read = b''
> -
> -                    self.logger.debug2('Partial boot log:\n%s' % (read.decode('utf-8', errors='backslashreplace')))
> -                    data = data + read
> -                    if data:
> -                        bootlog += data
> -                        self.log(data, extension = ".2")
> -                        data = b''
> -
> -                        if bytes(self.boot_patterns['search_reached_prompt'], 'utf-8') in bootlog:
> -                            self.server_socket.close()
> -                            self.server_socket = qemusock
> -                            stopread = True
> -                            reachedlogin = True
> -                            self.logger.debug("Reached login banner in %.2f seconds (%s)" %
> -                                              (time.time() - (endtime - self.boottime),
> -                                              time.strftime("%D %H:%M:%S")))
> -                    else:
> -                        # no need to check if reachedlogin unless we support multiple connections
> -                        self.logger.debug("QEMU socket disconnected before login banner reached. (%s)" %
> -                                          time.strftime("%D %H:%M:%S"))
> -                        filelist.remove(file)
> -                        file.close()
> +                    if login_semaphore.acquire(blocking=False):
> +                        self.server_socket.close()
> +                        self.server_socket = qemusock
>                          stopread = True
> +                        reachedlogin = True
> +
> +        self.logger.debug("continuing on....")
>
>          if not reachedlogin:
>              if time.time() >= endtime:
>                  self.logger.warning("Target didn't reach login banner in %d seconds (%s)" %
>                                    (self.boottime, time.strftime("%D %H:%M:%S")))
>              tail = lambda l: "\n".join(l.splitlines()[-25:])
> -            bootlog = self.decode_qemulog(bootlog)
> -            self.logger.warning("Last 25 lines of login console (%d):\n%s" % (len(bootlog), tail(bootlog)))
>              self.logger.warning("Last 25 lines of all logging (%d):\n%s" % (len(self.msg), tail(self.msg)))
>              self.logger.warning("Check full boot log: %s" % self.logfile)
>              self.stop()
> @@ -539,6 +521,7 @@ class QemuRunner:
>                  self.logger.warning("The output:\n%s" % output)
>          except:
>              self.logger.warning("Serial console failed while trying to login")
> +
>          return True
>
>      def stop(self):
> @@ -696,14 +679,16 @@ class QemuRunner:
>                      status = 1
>          return (status, str(data))
>
> -# This class is for reading data from a socket and passing it to logfunc
> -# to be processed. It's completely event driven and has a straightforward
> -# event loop. The mechanism for stopping the thread is a simple pipe which
> -# will wake up the poll and allow for tearing everything down.
> +# This class is for reading data from sockets and QEMU's stdio output
> +# and passing it to logfunc to be processed. It's completely event
> +# driven and has a straightforward event loop. The mechanism for
> +# stopping the thread is a simple pipe which will wake up the poll and
> +# allow for tearing everything down.
>  class LoggingThread(threading.Thread):
> -    def __init__(self, logfunc, sock, logger):
> +    def __init__(self, logfunc, sock, qemu_stdio, prompt, logger, semaphore):
>          self.connection_established = threading.Event()
>          self.serversock = sock
> +        self.qemu_stdio = qemu_stdio
>          self.logfunc = logfunc
>          self.logger = logger
>          self.readsock = None
> @@ -713,9 +698,19 @@ class LoggingThread(threading.Thread):
>          self.errorevents = select.POLLERR | select.POLLHUP | select.POLLNVAL
>          self.readevents = select.POLLIN | select.POLLPRI
>
> +        # tracking until we see prompt
> +        self.prompt = prompt
> +        self.prompt_seen = False
> +        self.boot_log = b''
> +        self.semaphore = semaphore
> +        self.boottime = time.time()
> +
>          threading.Thread.__init__(self, target=self.threadtarget)
>
>      def threadtarget(self):
> +        # we acquire until we see the boot prompt
> +        self.semaphore.acquire()
> +
>          try:
>              self.eventloop()
>          finally:
> @@ -750,6 +745,7 @@ class LoggingThread(threading.Thread):
>          event_read_mask = self.errorevents | self.readevents
>          poll.register(self.serversock.fileno())
>          poll.register(self.readpipe, event_read_mask)
> +        poll.register(self.qemu_stdio, event_read_mask)
>
>          breakout = False
>          self.running = True
> @@ -757,8 +753,12 @@ class LoggingThread(threading.Thread):
>          while not breakout:
>              events = poll.poll()
>              for event in events:
> +
> +                self.logger.debug(f"event is {event}")
> +
>                  # An error occurred, bail out
>                  if event[1] & self.errorevents:
> +                    self.logger.debug("error event")
>                      raise Exception(self.stringify_event(event[1]))
>
>                  # Event to stop the thread
> @@ -767,6 +767,10 @@ class LoggingThread(threading.Thread):
>                      breakout = True
>                      break
>
> +                # stdio data to be logged
> +                elif self.qemu_stdio.fileno() == event[0]:
> +                    self.consume_qemu_stdio()
> +
>                  # A connection request was received
>                  elif self.serversock.fileno() == event[0]:
>                      self.logger.debug("Connection request received")
> @@ -783,6 +787,30 @@ class LoggingThread(threading.Thread):
>                      data = self.recv(1024)
>                      self.logfunc(data)
>
> +
> +    # Consume QEMU's stdio output, checking for login strings
> +    def consume_qemu_stdio(self):
> +        # try to avoid reading only a single character at a time
> +        time.sleep(0.1)
> +        read = self.qemu_stdio.read(1024)
> +
> +        # log what we have seen
> +        decoded_data = read.decode('utf-8', errors='backslashreplace')
> +        if self.prompt_seen:
> +            self.logger.debug2('Post login log:\n%s' % decoded_data)
> +        else:
> +            self.logger.debug2('Pre login log:\n%s' % decoded_data)
> +
> +        if not self.prompt_seen and read:
> +            self.boot_log += read
> +
> +            if bytes(self.prompt, 'utf-8') in self.boot_log:
> +                time_to_login = time.time() - self.boottime
> +                self.logger.debug("Reached login banner in %.2f seconds (%s)" %
> +                                  (time_to_login,  time.strftime("%D %H:%M:%S")))
> +                self.semaphore.release()
> +                self.prompt_seen = True
> +
>      # Since the socket is non-blocking make sure to honor EAGAIN
>      # and EWOULDBLOCK.
>      def recv(self, count):
> --
> 2.39.2
>
>
> -=-=-=-=-=-=-=-=-=-=-=-
> Links: You receive all messages sent to this group.
> View/Reply Online (#192177): https://lists.openembedded.org/g/openembedded-core/message/192177
> Mute This Topic: https://lists.openembedded.org/mt/103111135/1997914
> Group Owner: openembedded-core+owner@lists.openembedded.org
> Unsubscribe: https://lists.openembedded.org/g/openembedded-core/unsub [raj.khem@gmail.com]
> -=-=-=-=-=-=-=-=-=-=-=-
>
Alex Bennée Dec. 12, 2023, 6:51 p.m. UTC | #2
Khem Raj <raj.khem@gmail.com> writes:

> this version hangs the run forever.

Do you have a reproducer? In my "make test" case it works fine.

Alternatively you could gdb attach to the hanging python and use py-bt
to generate a python backtrace.

>
> On Mon, Dec 11, 2023 at 7:57 AM Alex Bennée <alex.bennee@linaro.org> wrote:
>>
>> If qemurunner doesn't continuously drain stdout we will eventually
>> cause QEMU to block while trying to write to the pipe. This can
>> manifest itself if the guest has for example configured its serial
>> ports to output via stdio even if the test itself is using a TCP
>> console or SSH to run things.
>>
>> To do this:
>>
>>   - always create a logging thread regardless of serial_ports
>>   - use a semaphore between main and logging threads
>>   - move the login detection into the logging thread
>>   - wait until the second acquire before continuing
>>
>> This doesn't address a potential overflow of stderr although generally
>> stderr from QEMU will be a lot less likely to block due to the volume
>> of data.
>>
>> Signed-off-by: Alex Bennée <alex.bennee@linaro.org>
>> Cc: Mikko Rapeli <mikko.rapeli@linaro.org>
>> ---
>>  meta/lib/oeqa/utils/qemurunner.py | 128 ++++++++++++++++++------------
>>  1 file changed, 78 insertions(+), 50 deletions(-)
>>
>> diff --git a/meta/lib/oeqa/utils/qemurunner.py b/meta/lib/oeqa/utils/qemurunner.py
>> index 29fe271976..b768c08f04 100644
>> --- a/meta/lib/oeqa/utils/qemurunner.py
>> +++ b/meta/lib/oeqa/utils/qemurunner.py
>> @@ -207,8 +207,7 @@ class QemuRunner:
>>          self.logger.info("QMP Available for connection at %s" % (qmp_port2))
>>
>>          try:
>> -            if self.serial_ports >= 2:
>> -                self.threadsock, threadport = self.create_socket()
>> +            self.threadsock, threadport = self.create_socket()
>>              self.server_socket, self.serverport = self.create_socket()
>>          except socket.error as msg:
>>              self.logger.error("Failed to create listening socket: %s" % msg[1])
>> @@ -243,6 +242,7 @@ class QemuRunner:
>>          # to be a proper fix but this will suffice for now.
>>          self.runqemu = subprocess.Popen(launch_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, preexec_fn=os.setpgrp, env=env, cwd=self.tmpdir)
>>          output = self.runqemu.stdout
>> +        output_drain = output
>>          launch_time = time.time()
>>
>>          #
>> @@ -431,21 +431,30 @@ class QemuRunner:
>>          self.logger.debug("Target IP: %s" % self.ip)
>>          self.logger.debug("Server IP: %s" % self.server_ip)
>>
>> -        if self.serial_ports >= 2:
>> -            self.thread = LoggingThread(self.log, self.threadsock, self.logger)
>> -            self.thread.start()
>> -            if not self.thread.connection_established.wait(self.boottime):
>> -                self.logger.error("Didn't receive a console connection from qemu. "
>> -                             "Here is the qemu command line used:\n%s\nand "
>> -                             "output from runqemu:\n%s" % (cmdline, out))
>> -                self.stop_thread()
>> -                return False
>> +        # Create and hold onto the login semaphore, this will block
>> +        # the LoggingThread until we are ready
>> +        login_semaphore = threading.Semaphore()
>> +        login_semaphore.acquire()
>> +
>> +        self.thread = LoggingThread(self.log, self.threadsock,
>> +                                    self.runqemu.stdout, self.boot_patterns['search_reached_prompt'],
>> +                                    self.logger, login_semaphore)
>> +
>> +        self.thread.start()
>> +        login_semaphore.release()
>> +
>> +        # if not self.thread.connection_established.wait(self.boottime):
>> +        #     self.logger.error("Didn't receive a console connection from qemu. "
>> +        #                       "Here is the qemu command line used:\n%s\nand "
>> +        #                       "output from runqemu:\n%s" % (cmdline, out))
>> +        #     self.stop_thread()
>> +        #     return False
>>
>>          self.logger.debug("Output from runqemu:\n%s", out)
>>          self.logger.debug("Waiting at most %d seconds for login banner (%s)" %
>>                            (self.boottime, time.strftime("%D %H:%M:%S")))
>>          endtime = time.time() + self.boottime
>> -        filelist = [self.server_socket, self.runqemu.stdout]
>> +        filelist = [self.server_socket]
>>          reachedlogin = False
>>          stopread = False
>>          qemusock = None
>> @@ -464,46 +473,19 @@ class QemuRunner:
>>                      filelist.remove(self.server_socket)
>>                      self.logger.debug("Connection from %s:%s" % addr)
>>                  else:
>> -                    # try to avoid reading only a single character at a time
>> -                    time.sleep(0.1)
>> -                    if hasattr(file, 'read'):
>> -                        read = file.read(1024)
>> -                    elif hasattr(file, 'recv'):
>> -                        read = file.recv(1024)
>> -                    else:
>> -                        self.logger.error('Invalid file type: %s\n%s' % (file))
>> -                        read = b''
>> -
>> -                    self.logger.debug2('Partial boot log:\n%s' % (read.decode('utf-8', errors='backslashreplace')))
>> -                    data = data + read
>> -                    if data:
>> -                        bootlog += data
>> -                        self.log(data, extension = ".2")
>> -                        data = b''
>> -
>> -                        if bytes(self.boot_patterns['search_reached_prompt'], 'utf-8') in bootlog:
>> -                            self.server_socket.close()
>> -                            self.server_socket = qemusock
>> -                            stopread = True
>> -                            reachedlogin = True
>> -                            self.logger.debug("Reached login banner in %.2f seconds (%s)" %
>> -                                              (time.time() - (endtime - self.boottime),
>> -                                              time.strftime("%D %H:%M:%S")))
>> -                    else:
>> -                        # no need to check if reachedlogin unless we support multiple connections
>> -                        self.logger.debug("QEMU socket disconnected before login banner reached. (%s)" %
>> -                                          time.strftime("%D %H:%M:%S"))
>> -                        filelist.remove(file)
>> -                        file.close()
>> +                    if login_semaphore.acquire(blocking=False):
>> +                        self.server_socket.close()
>> +                        self.server_socket = qemusock
>>                          stopread = True
>> +                        reachedlogin = True
>> +
>> +        self.logger.debug("continuing on....")
>>
>>          if not reachedlogin:
>>              if time.time() >= endtime:
>>                  self.logger.warning("Target didn't reach login banner in %d seconds (%s)" %
>>                                    (self.boottime, time.strftime("%D %H:%M:%S")))
>>              tail = lambda l: "\n".join(l.splitlines()[-25:])
>> -            bootlog = self.decode_qemulog(bootlog)
>> -            self.logger.warning("Last 25 lines of login console (%d):\n%s" % (len(bootlog), tail(bootlog)))
>>              self.logger.warning("Last 25 lines of all logging (%d):\n%s" % (len(self.msg), tail(self.msg)))
>>              self.logger.warning("Check full boot log: %s" % self.logfile)
>>              self.stop()
>> @@ -539,6 +521,7 @@ class QemuRunner:
>>                  self.logger.warning("The output:\n%s" % output)
>>          except:
>>              self.logger.warning("Serial console failed while trying to login")
>> +
>>          return True
>>
>>      def stop(self):
>> @@ -696,14 +679,16 @@ class QemuRunner:
>>                      status = 1
>>          return (status, str(data))
>>
>> -# This class is for reading data from a socket and passing it to logfunc
>> -# to be processed. It's completely event driven and has a straightforward
>> -# event loop. The mechanism for stopping the thread is a simple pipe which
>> -# will wake up the poll and allow for tearing everything down.
>> +# This class is for reading data from sockets and QEMU's stdio output
>> +# and passing it to logfunc to be processed. It's completely event
>> +# driven and has a straightforward event loop. The mechanism for
>> +# stopping the thread is a simple pipe which will wake up the poll and
>> +# allow for tearing everything down.
>>  class LoggingThread(threading.Thread):
>> -    def __init__(self, logfunc, sock, logger):
>> +    def __init__(self, logfunc, sock, qemu_stdio, prompt, logger, semaphore):
>>          self.connection_established = threading.Event()
>>          self.serversock = sock
>> +        self.qemu_stdio = qemu_stdio
>>          self.logfunc = logfunc
>>          self.logger = logger
>>          self.readsock = None
>> @@ -713,9 +698,19 @@ class LoggingThread(threading.Thread):
>>          self.errorevents = select.POLLERR | select.POLLHUP | select.POLLNVAL
>>          self.readevents = select.POLLIN | select.POLLPRI
>>
>> +        # tracking until we see prompt
>> +        self.prompt = prompt
>> +        self.prompt_seen = False
>> +        self.boot_log = b''
>> +        self.semaphore = semaphore
>> +        self.boottime = time.time()
>> +
>>          threading.Thread.__init__(self, target=self.threadtarget)
>>
>>      def threadtarget(self):
>> +        # we acquire until we see the boot prompt
>> +        self.semaphore.acquire()
>> +
>>          try:
>>              self.eventloop()
>>          finally:
>> @@ -750,6 +745,7 @@ class LoggingThread(threading.Thread):
>>          event_read_mask = self.errorevents | self.readevents
>>          poll.register(self.serversock.fileno())
>>          poll.register(self.readpipe, event_read_mask)
>> +        poll.register(self.qemu_stdio, event_read_mask)
>>
>>          breakout = False
>>          self.running = True
>> @@ -757,8 +753,12 @@ class LoggingThread(threading.Thread):
>>          while not breakout:
>>              events = poll.poll()
>>              for event in events:
>> +
>> +                self.logger.debug(f"event is {event}")
>> +
>>                  # An error occurred, bail out
>>                  if event[1] & self.errorevents:
>> +                    self.logger.debug("error event")
>>                      raise Exception(self.stringify_event(event[1]))
>>
>>                  # Event to stop the thread
>> @@ -767,6 +767,10 @@ class LoggingThread(threading.Thread):
>>                      breakout = True
>>                      break
>>
>> +                # stdio data to be logged
>> +                elif self.qemu_stdio.fileno() == event[0]:
>> +                    self.consume_qemu_stdio()
>> +
>>                  # A connection request was received
>>                  elif self.serversock.fileno() == event[0]:
>>                      self.logger.debug("Connection request received")
>> @@ -783,6 +787,30 @@ class LoggingThread(threading.Thread):
>>                      data = self.recv(1024)
>>                      self.logfunc(data)
>>
>> +
>> +    # Consume QEMU's stdio output, checking for login strings
>> +    def consume_qemu_stdio(self):
>> +        # try to avoid reading only a single character at a time
>> +        time.sleep(0.1)
>> +        read = self.qemu_stdio.read(1024)
>> +
>> +        # log what we have seen
>> +        decoded_data = read.decode('utf-8', errors='backslashreplace')
>> +        if self.prompt_seen:
>> +            self.logger.debug2('Post login log:\n%s' % decoded_data)
>> +        else:
>> +            self.logger.debug2('Pre login log:\n%s' % decoded_data)
>> +
>> +        if not self.prompt_seen and read:
>> +            self.boot_log += read
>> +
>> +            if bytes(self.prompt, 'utf-8') in self.boot_log:
>> +                time_to_login = time.time() - self.boottime
>> +                self.logger.debug("Reached login banner in %.2f seconds (%s)" %
>> +                                  (time_to_login,  time.strftime("%D %H:%M:%S")))
>> +                self.semaphore.release()
>> +                self.prompt_seen = True
>> +
>>      # Since the socket is non-blocking make sure to honor EAGAIN
>>      # and EWOULDBLOCK.
>>      def recv(self, count):
>> --
>> 2.39.2
>>
>>
>> -=-=-=-=-=-=-=-=-=-=-=-
>> Links: You receive all messages sent to this group.
>> View/Reply Online (#192177): https://lists.openembedded.org/g/openembedded-core/message/192177
>> Mute This Topic: https://lists.openembedded.org/mt/103111135/1997914
>> Group Owner: openembedded-core+owner@lists.openembedded.org
>> Unsubscribe: https://lists.openembedded.org/g/openembedded-core/unsub [raj.khem@gmail.com]
>> -=-=-=-=-=-=-=-=-=-=-=-
>>
Khem Raj Dec. 12, 2023, 7:43 p.m. UTC | #3
I am on arch so no py-bt with gdb out of box but here is bt

#0  0x00007fa1e54ee367 in wait4 () from
/mnt/b/yoe/master/build/tmp/sysroots-uninative/x86_64-linux/lib/libc.so.6
#1  0x00007fa1e5a858b9 in ?? () from
/mnt/b/yoe/master/build/tmp/work/qemux86_64-yoe-linux/core-image-ptest-gstreamer1.0/1.0/recipe-sysroot-native/usr/bin/python3-native/../../lib/libpython3.11.so.1.0
#2  0x00007fa1e598e94a in ?? () from
/mnt/b/yoe/master/build/tmp/work/qemux86_64-yoe-linux/core-image-ptest-gstreamer1.0/1.0/recipe-sysroot-native/usr/bin/python3-native/../../lib/libpython3.11.so.1.0
#3  0x00007fa1e594bf03 in PyObject_Vectorcall () from
/mnt/b/yoe/master/build/tmp/work/qemux86_64-yoe-linux/core-image-ptest-gstreamer1.0/1.0/recipe-sysroot-native/usr/bin/python3-native/../../lib/libpython3.11.so.1.0
#4  0x00007fa1e5900063 in _PyEval_EvalFrameDefault () from
/mnt/b/yoe/master/build/tmp/work/qemux86_64-yoe-linux/core-image-ptest-gstreamer1.0/1.0/recipe-sysroot-native/usr/bin/python3-native/../../lib/libpython3.11.so.1.0
#5  0x00007fa1e5a1da40 in ?? () from
/mnt/b/yoe/master/build/tmp/work/qemux86_64-yoe-linux/core-image-ptest-gstreamer1.0/1.0/recipe-sysroot-native/usr/bin/python3-native/../../lib/libpython3.11.so.1.0
#6  0x00007fa1e5a1daf5 in PyEval_EvalCode () from
/mnt/b/yoe/master/build/tmp/work/qemux86_64-yoe-linux/core-image-ptest-gstreamer1.0/1.0/recipe-sysroot-native/usr/bin/python3-native/../../lib/libpython3.11.so.1.0
#7  0x00007fa1e5a5deb3 in ?? () from
/mnt/b/yoe/master/build/tmp/work/qemux86_64-yoe-linux/core-image-ptest-gstreamer1.0/1.0/recipe-sysroot-native/usr/bin/python3-native/../../lib/libpython3.11.so.1.0
#8  0x00007fa1e5a5e0d6 in ?? () from
/mnt/b/yoe/master/build/tmp/work/qemux86_64-yoe-linux/core-image-ptest-gstreamer1.0/1.0/recipe-sysroot-native/usr/bin/python3-native/../../lib/libpython3.11.so.1.0
#9  0x00007fa1e5a5e1b0 in ?? () from
/mnt/b/yoe/master/build/tmp/work/qemux86_64-yoe-linux/core-image-ptest-gstreamer1.0/1.0/recipe-sysroot-native/usr/bin/python3-native/../../lib/libpython3.11.so.1.0
#10 0x00007fa1e5a60b9b in _PyRun_SimpleFileObject () from
/mnt/b/yoe/master/build/tmp/work/qemux86_64-yoe-linux/core-image-ptest-gstreamer1.0/1.0/recipe-sysroot-native/usr/bin/python3-native/../../lib/libpython3.11.so.1.0
#11 0x00007fa1e5a6111c in _PyRun_AnyFileObject () from
/mnt/b/yoe/master/build/tmp/work/qemux86_64-yoe-linux/core-image-ptest-gstreamer1.0/1.0/recipe-sysroot-native/usr/bin/python3-native/../../lib/libpython3.11.so.1.0
#12 0x00007fa1e5a7ddf0 in Py_RunMain () from
/mnt/b/yoe/master/build/tmp/work/qemux86_64-yoe-linux/core-image-ptest-gstreamer1.0/1.0/recipe-sysroot-native/usr/bin/python3-native/../../lib/libpython3.11.so.1.0
#13 0x00007fa1e5a7e32a in Py_BytesMain () from
/mnt/b/yoe/master/build/tmp/work/qemux86_64-yoe-linux/core-image-ptest-gstreamer1.0/1.0/recipe-sysroot-native/usr/bin/python3-native/../../lib/libpython3.11.so.1.0
#14 0x00007fa1e5423efb in __libc_start_call_main () from
/mnt/b/yoe/master/build/tmp/sysroots-uninative/x86_64-linux/lib/libc.so.6
#15 0x00007fa1e5423fb9 in __libc_start_main () from
/mnt/b/yoe/master/build/tmp/sysroots-uninative/x86_64-linux/lib/libc.so.6
#16 0x00005581e2c53075 in _start ()

since its using native libs and bins there is no debug info perhaps
due to it being stripped and not kept around as well.

On Tue, Dec 12, 2023 at 10:51 AM Alex Bennée <alex.bennee@linaro.org> wrote:
>
> Khem Raj <raj.khem@gmail.com> writes:
>
> > this version hangs the run forever.
>
> Do you have a reproducer? In my "make test" case it works fine.
>
> Alternatively you could gdb attach to the hanging python and use py-bt
> to generate a python backtrace.
>
> >
> > On Mon, Dec 11, 2023 at 7:57 AM Alex Bennée <alex.bennee@linaro.org> wrote:
> >>
> >> If qemurunner doesn't continuously drain stdout we will eventually
> >> cause QEMU to block while trying to write to the pipe. This can
> >> manifest itself if the guest has for example configured its serial
> >> ports to output via stdio even if the test itself is using a TCP
> >> console or SSH to run things.
> >>
> >> To do this:
> >>
> >>   - always create a logging thread regardless of serial_ports
> >>   - use a semaphore between main and logging threads
> >>   - move the login detection into the logging thread
> >>   - wait until the second acquire before continuing
> >>
> >> This doesn't address a potential overflow of stderr although generally
> >> stderr from QEMU will be a lot less likely to block due to the volume
> >> of data.
> >>
> >> Signed-off-by: Alex Bennée <alex.bennee@linaro.org>
> >> Cc: Mikko Rapeli <mikko.rapeli@linaro.org>
> >> ---
> >>  meta/lib/oeqa/utils/qemurunner.py | 128 ++++++++++++++++++------------
> >>  1 file changed, 78 insertions(+), 50 deletions(-)
> >>
> >> diff --git a/meta/lib/oeqa/utils/qemurunner.py b/meta/lib/oeqa/utils/qemurunner.py
> >> index 29fe271976..b768c08f04 100644
> >> --- a/meta/lib/oeqa/utils/qemurunner.py
> >> +++ b/meta/lib/oeqa/utils/qemurunner.py
> >> @@ -207,8 +207,7 @@ class QemuRunner:
> >>          self.logger.info("QMP Available for connection at %s" % (qmp_port2))
> >>
> >>          try:
> >> -            if self.serial_ports >= 2:
> >> -                self.threadsock, threadport = self.create_socket()
> >> +            self.threadsock, threadport = self.create_socket()
> >>              self.server_socket, self.serverport = self.create_socket()
> >>          except socket.error as msg:
> >>              self.logger.error("Failed to create listening socket: %s" % msg[1])
> >> @@ -243,6 +242,7 @@ class QemuRunner:
> >>          # to be a proper fix but this will suffice for now.
> >>          self.runqemu = subprocess.Popen(launch_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, preexec_fn=os.setpgrp, env=env, cwd=self.tmpdir)
> >>          output = self.runqemu.stdout
> >> +        output_drain = output
> >>          launch_time = time.time()
> >>
> >>          #
> >> @@ -431,21 +431,30 @@ class QemuRunner:
> >>          self.logger.debug("Target IP: %s" % self.ip)
> >>          self.logger.debug("Server IP: %s" % self.server_ip)
> >>
> >> -        if self.serial_ports >= 2:
> >> -            self.thread = LoggingThread(self.log, self.threadsock, self.logger)
> >> -            self.thread.start()
> >> -            if not self.thread.connection_established.wait(self.boottime):
> >> -                self.logger.error("Didn't receive a console connection from qemu. "
> >> -                             "Here is the qemu command line used:\n%s\nand "
> >> -                             "output from runqemu:\n%s" % (cmdline, out))
> >> -                self.stop_thread()
> >> -                return False
> >> +        # Create and hold onto the login semaphore, this will block
> >> +        # the LoggingThread until we are ready
> >> +        login_semaphore = threading.Semaphore()
> >> +        login_semaphore.acquire()
> >> +
> >> +        self.thread = LoggingThread(self.log, self.threadsock,
> >> +                                    self.runqemu.stdout, self.boot_patterns['search_reached_prompt'],
> >> +                                    self.logger, login_semaphore)
> >> +
> >> +        self.thread.start()
> >> +        login_semaphore.release()
> >> +
> >> +        # if not self.thread.connection_established.wait(self.boottime):
> >> +        #     self.logger.error("Didn't receive a console connection from qemu. "
> >> +        #                       "Here is the qemu command line used:\n%s\nand "
> >> +        #                       "output from runqemu:\n%s" % (cmdline, out))
> >> +        #     self.stop_thread()
> >> +        #     return False
> >>
> >>          self.logger.debug("Output from runqemu:\n%s", out)
> >>          self.logger.debug("Waiting at most %d seconds for login banner (%s)" %
> >>                            (self.boottime, time.strftime("%D %H:%M:%S")))
> >>          endtime = time.time() + self.boottime
> >> -        filelist = [self.server_socket, self.runqemu.stdout]
> >> +        filelist = [self.server_socket]
> >>          reachedlogin = False
> >>          stopread = False
> >>          qemusock = None
> >> @@ -464,46 +473,19 @@ class QemuRunner:
> >>                      filelist.remove(self.server_socket)
> >>                      self.logger.debug("Connection from %s:%s" % addr)
> >>                  else:
> >> -                    # try to avoid reading only a single character at a time
> >> -                    time.sleep(0.1)
> >> -                    if hasattr(file, 'read'):
> >> -                        read = file.read(1024)
> >> -                    elif hasattr(file, 'recv'):
> >> -                        read = file.recv(1024)
> >> -                    else:
> >> -                        self.logger.error('Invalid file type: %s\n%s' % (file))
> >> -                        read = b''
> >> -
> >> -                    self.logger.debug2('Partial boot log:\n%s' % (read.decode('utf-8', errors='backslashreplace')))
> >> -                    data = data + read
> >> -                    if data:
> >> -                        bootlog += data
> >> -                        self.log(data, extension = ".2")
> >> -                        data = b''
> >> -
> >> -                        if bytes(self.boot_patterns['search_reached_prompt'], 'utf-8') in bootlog:
> >> -                            self.server_socket.close()
> >> -                            self.server_socket = qemusock
> >> -                            stopread = True
> >> -                            reachedlogin = True
> >> -                            self.logger.debug("Reached login banner in %.2f seconds (%s)" %
> >> -                                              (time.time() - (endtime - self.boottime),
> >> -                                              time.strftime("%D %H:%M:%S")))
> >> -                    else:
> >> -                        # no need to check if reachedlogin unless we support multiple connections
> >> -                        self.logger.debug("QEMU socket disconnected before login banner reached. (%s)" %
> >> -                                          time.strftime("%D %H:%M:%S"))
> >> -                        filelist.remove(file)
> >> -                        file.close()
> >> +                    if login_semaphore.acquire(blocking=False):
> >> +                        self.server_socket.close()
> >> +                        self.server_socket = qemusock
> >>                          stopread = True
> >> +                        reachedlogin = True
> >> +
> >> +        self.logger.debug("continuing on....")
> >>
> >>          if not reachedlogin:
> >>              if time.time() >= endtime:
> >>                  self.logger.warning("Target didn't reach login banner in %d seconds (%s)" %
> >>                                    (self.boottime, time.strftime("%D %H:%M:%S")))
> >>              tail = lambda l: "\n".join(l.splitlines()[-25:])
> >> -            bootlog = self.decode_qemulog(bootlog)
> >> -            self.logger.warning("Last 25 lines of login console (%d):\n%s" % (len(bootlog), tail(bootlog)))
> >>              self.logger.warning("Last 25 lines of all logging (%d):\n%s" % (len(self.msg), tail(self.msg)))
> >>              self.logger.warning("Check full boot log: %s" % self.logfile)
> >>              self.stop()
> >> @@ -539,6 +521,7 @@ class QemuRunner:
> >>                  self.logger.warning("The output:\n%s" % output)
> >>          except:
> >>              self.logger.warning("Serial console failed while trying to login")
> >> +
> >>          return True
> >>
> >>      def stop(self):
> >> @@ -696,14 +679,16 @@ class QemuRunner:
> >>                      status = 1
> >>          return (status, str(data))
> >>
> >> -# This class is for reading data from a socket and passing it to logfunc
> >> -# to be processed. It's completely event driven and has a straightforward
> >> -# event loop. The mechanism for stopping the thread is a simple pipe which
> >> -# will wake up the poll and allow for tearing everything down.
> >> +# This class is for reading data from sockets and QEMU's stdio output
> >> +# and passing it to logfunc to be processed. It's completely event
> >> +# driven and has a straightforward event loop. The mechanism for
> >> +# stopping the thread is a simple pipe which will wake up the poll and
> >> +# allow for tearing everything down.
> >>  class LoggingThread(threading.Thread):
> >> -    def __init__(self, logfunc, sock, logger):
> >> +    def __init__(self, logfunc, sock, qemu_stdio, prompt, logger, semaphore):
> >>          self.connection_established = threading.Event()
> >>          self.serversock = sock
> >> +        self.qemu_stdio = qemu_stdio
> >>          self.logfunc = logfunc
> >>          self.logger = logger
> >>          self.readsock = None
> >> @@ -713,9 +698,19 @@ class LoggingThread(threading.Thread):
> >>          self.errorevents = select.POLLERR | select.POLLHUP | select.POLLNVAL
> >>          self.readevents = select.POLLIN | select.POLLPRI
> >>
> >> +        # tracking until we see prompt
> >> +        self.prompt = prompt
> >> +        self.prompt_seen = False
> >> +        self.boot_log = b''
> >> +        self.semaphore = semaphore
> >> +        self.boottime = time.time()
> >> +
> >>          threading.Thread.__init__(self, target=self.threadtarget)
> >>
> >>      def threadtarget(self):
> >> +        # we acquire until we see the boot prompt
> >> +        self.semaphore.acquire()
> >> +
> >>          try:
> >>              self.eventloop()
> >>          finally:
> >> @@ -750,6 +745,7 @@ class LoggingThread(threading.Thread):
> >>          event_read_mask = self.errorevents | self.readevents
> >>          poll.register(self.serversock.fileno())
> >>          poll.register(self.readpipe, event_read_mask)
> >> +        poll.register(self.qemu_stdio, event_read_mask)
> >>
> >>          breakout = False
> >>          self.running = True
> >> @@ -757,8 +753,12 @@ class LoggingThread(threading.Thread):
> >>          while not breakout:
> >>              events = poll.poll()
> >>              for event in events:
> >> +
> >> +                self.logger.debug(f"event is {event}")
> >> +
> >>                  # An error occurred, bail out
> >>                  if event[1] & self.errorevents:
> >> +                    self.logger.debug("error event")
> >>                      raise Exception(self.stringify_event(event[1]))
> >>
> >>                  # Event to stop the thread
> >> @@ -767,6 +767,10 @@ class LoggingThread(threading.Thread):
> >>                      breakout = True
> >>                      break
> >>
> >> +                # stdio data to be logged
> >> +                elif self.qemu_stdio.fileno() == event[0]:
> >> +                    self.consume_qemu_stdio()
> >> +
> >>                  # A connection request was received
> >>                  elif self.serversock.fileno() == event[0]:
> >>                      self.logger.debug("Connection request received")
> >> @@ -783,6 +787,30 @@ class LoggingThread(threading.Thread):
> >>                      data = self.recv(1024)
> >>                      self.logfunc(data)
> >>
> >> +
> >> +    # Consume QEMU's stdio output, checking for login strings
> >> +    def consume_qemu_stdio(self):
> >> +        # try to avoid reading only a single character at a time
> >> +        time.sleep(0.1)
> >> +        read = self.qemu_stdio.read(1024)
> >> +
> >> +        # log what we have seen
> >> +        decoded_data = read.decode('utf-8', errors='backslashreplace')
> >> +        if self.prompt_seen:
> >> +            self.logger.debug2('Post login log:\n%s' % decoded_data)
> >> +        else:
> >> +            self.logger.debug2('Pre login log:\n%s' % decoded_data)
> >> +
> >> +        if not self.prompt_seen and read:
> >> +            self.boot_log += read
> >> +
> >> +            if bytes(self.prompt, 'utf-8') in self.boot_log:
> >> +                time_to_login = time.time() - self.boottime
> >> +                self.logger.debug("Reached login banner in %.2f seconds (%s)" %
> >> +                                  (time_to_login,  time.strftime("%D %H:%M:%S")))
> >> +                self.semaphore.release()
> >> +                self.prompt_seen = True
> >> +
> >>      # Since the socket is non-blocking make sure to honor EAGAIN
> >>      # and EWOULDBLOCK.
> >>      def recv(self, count):
> >> --
> >> 2.39.2
> >>
> >>
> >> -=-=-=-=-=-=-=-=-=-=-=-
> >> Links: You receive all messages sent to this group.
> >> View/Reply Online (#192177): https://lists.openembedded.org/g/openembedded-core/message/192177
> >> Mute This Topic: https://lists.openembedded.org/mt/103111135/1997914
> >> Group Owner: openembedded-core+owner@lists.openembedded.org
> >> Unsubscribe: https://lists.openembedded.org/g/openembedded-core/unsub [raj.khem@gmail.com]
> >> -=-=-=-=-=-=-=-=-=-=-=-
> >>
>
> --
> Alex Bennée
> Virtualisation Tech Lead @ Linaro
Alex Bennée Dec. 12, 2023, 10:57 p.m. UTC | #4
Khem Raj <raj.khem@gmail.com> writes:

> I am on arch so no py-bt with gdb out of box but here is bt
>
> #0  0x00007fa1e54ee367 in wait4 () from
> /mnt/b/yoe/master/build/tmp/sysroots-uninative/x86_64-linux/lib/libc.so.6
> #1  0x00007fa1e5a858b9 in ?? () from
> /mnt/b/yoe/master/build/tmp/work/qemux86_64-yoe-linux/core-image-ptest-gstreamer1.0/1.0/recipe-sysroot-native/usr/bin/python3-native/../../lib/libpython3.11.so.1.0
> #2  0x00007fa1e598e94a in ?? () from
> /mnt/b/yoe/master/build/tmp/work/qemux86_64-yoe-linux/core-image-ptest-gstreamer1.0/1.0/recipe-sysroot-native/usr/bin/python3-native/../../lib/libpython3.11.so.1.0
> #3  0x00007fa1e594bf03 in PyObject_Vectorcall () from
> /mnt/b/yoe/master/build/tmp/work/qemux86_64-yoe-linux/core-image-ptest-gstreamer1.0/1.0/recipe-sysroot-native/usr/bin/python3-native/../../lib/libpython3.11.so.1.0
> #4  0x00007fa1e5900063 in _PyEval_EvalFrameDefault () from
> /mnt/b/yoe/master/build/tmp/work/qemux86_64-yoe-linux/core-image-ptest-gstreamer1.0/1.0/recipe-sysroot-native/usr/bin/python3-native/../../lib/libpython3.11.so.1.0
> #5  0x00007fa1e5a1da40 in ?? () from
> /mnt/b/yoe/master/build/tmp/work/qemux86_64-yoe-linux/core-image-ptest-gstreamer1.0/1.0/recipe-sysroot-native/usr/bin/python3-native/../../lib/libpython3.11.so.1.0
> #6  0x00007fa1e5a1daf5 in PyEval_EvalCode () from
> /mnt/b/yoe/master/build/tmp/work/qemux86_64-yoe-linux/core-image-ptest-gstreamer1.0/1.0/recipe-sysroot-native/usr/bin/python3-native/../../lib/libpython3.11.so.1.0
> #7  0x00007fa1e5a5deb3 in ?? () from
> /mnt/b/yoe/master/build/tmp/work/qemux86_64-yoe-linux/core-image-ptest-gstreamer1.0/1.0/recipe-sysroot-native/usr/bin/python3-native/../../lib/libpython3.11.so.1.0
> #8  0x00007fa1e5a5e0d6 in ?? () from
> /mnt/b/yoe/master/build/tmp/work/qemux86_64-yoe-linux/core-image-ptest-gstreamer1.0/1.0/recipe-sysroot-native/usr/bin/python3-native/../../lib/libpython3.11.so.1.0
> #9  0x00007fa1e5a5e1b0 in ?? () from
> /mnt/b/yoe/master/build/tmp/work/qemux86_64-yoe-linux/core-image-ptest-gstreamer1.0/1.0/recipe-sysroot-native/usr/bin/python3-native/../../lib/libpython3.11.so.1.0
> #10 0x00007fa1e5a60b9b in _PyRun_SimpleFileObject () from
> /mnt/b/yoe/master/build/tmp/work/qemux86_64-yoe-linux/core-image-ptest-gstreamer1.0/1.0/recipe-sysroot-native/usr/bin/python3-native/../../lib/libpython3.11.so.1.0
> #11 0x00007fa1e5a6111c in _PyRun_AnyFileObject () from
> /mnt/b/yoe/master/build/tmp/work/qemux86_64-yoe-linux/core-image-ptest-gstreamer1.0/1.0/recipe-sysroot-native/usr/bin/python3-native/../../lib/libpython3.11.so.1.0
> #12 0x00007fa1e5a7ddf0 in Py_RunMain () from
> /mnt/b/yoe/master/build/tmp/work/qemux86_64-yoe-linux/core-image-ptest-gstreamer1.0/1.0/recipe-sysroot-native/usr/bin/python3-native/../../lib/libpython3.11.so.1.0
> #13 0x00007fa1e5a7e32a in Py_BytesMain () from
> /mnt/b/yoe/master/build/tmp/work/qemux86_64-yoe-linux/core-image-ptest-gstreamer1.0/1.0/recipe-sysroot-native/usr/bin/python3-native/../../lib/libpython3.11.so.1.0
> #14 0x00007fa1e5423efb in __libc_start_call_main () from
> /mnt/b/yoe/master/build/tmp/sysroots-uninative/x86_64-linux/lib/libc.so.6
> #15 0x00007fa1e5423fb9 in __libc_start_main () from
> /mnt/b/yoe/master/build/tmp/sysroots-uninative/x86_64-linux/lib/libc.so.6
> #16 0x00005581e2c53075 in _start ()
>
> since its using native libs and bins there is no debug info perhaps
> due to it being stripped and not kept around as well.

No - it won't work without debug symbols to poke around in pythons
runtime. Can you share the log?

Do you see anything from:

                    self.logger.warning('Extra log data read: %s\n' % (data.decode('utf-8', errors='backslashreplace')))
                except Exception as e:
                    self.logger.warning('Extra log data exception %s' % repr(e))


maybe we should have a: self.stop_thread() once we catch the exception?
Richard Purdie Dec. 18, 2023, 11:23 a.m. UTC | #5
On Mon, 2023-12-11 at 15:57 +0000, Alex Bennée wrote:
> If qemurunner doesn't continuously drain stdout we will eventually
> cause QEMU to block while trying to write to the pipe. This can
> manifest itself if the guest has for example configured its serial
> ports to output via stdio even if the test itself is using a TCP
> console or SSH to run things.
> 
> To do this:
> 
>   - always create a logging thread regardless of serial_ports
>   - use a semaphore between main and logging threads
>   - move the login detection into the logging thread
>   - wait until the second acquire before continuing
> 
> This doesn't address a potential overflow of stderr although generally
> stderr from QEMU will be a lot less likely to block due to the volume
> of data.
> 
> Signed-off-by: Alex Bennée <alex.bennee@linaro.org>
> Cc: Mikko Rapeli <mikko.rapeli@linaro.org>
> ---
>  meta/lib/oeqa/utils/qemurunner.py | 128 ++++++++++++++++++------------
>  1 file changed, 78 insertions(+), 50 deletions(-)
> 
> diff --git a/meta/lib/oeqa/utils/qemurunner.py b/meta/lib/oeqa/utils/qemurunner.py
> index 29fe271976..b768c08f04 100644
> --- a/meta/lib/oeqa/utils/qemurunner.py
> +++ b/meta/lib/oeqa/utils/qemurunner.py
> @@ -207,8 +207,7 @@ class QemuRunner:
>          self.logger.info("QMP Available for connection at %s" % (qmp_port2))
>  
>          try:
> -            if self.serial_ports >= 2:
> -                self.threadsock, threadport = self.create_socket()
> +            self.threadsock, threadport = self.create_socket()
>              self.server_socket, self.serverport = self.create_socket()
>          except socket.error as msg:
>              self.logger.error("Failed to create listening socket: %s" % msg[1])
> @@ -243,6 +242,7 @@ class QemuRunner:
>          # to be a proper fix but this will suffice for now.
>          self.runqemu = subprocess.Popen(launch_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, preexec_fn=os.setpgrp, env=env, cwd=self.tmpdir)
>          output = self.runqemu.stdout
> +        output_drain = output
>          launch_time = time.time()
>  
>          #
> @@ -431,21 +431,30 @@ class QemuRunner:
>          self.logger.debug("Target IP: %s" % self.ip)
>          self.logger.debug("Server IP: %s" % self.server_ip)
>  
> -        if self.serial_ports >= 2:
> -            self.thread = LoggingThread(self.log, self.threadsock, self.logger)
> -            self.thread.start()
> -            if not self.thread.connection_established.wait(self.boottime):
> -                self.logger.error("Didn't receive a console connection from qemu. "
> -                             "Here is the qemu command line used:\n%s\nand "
> -                             "output from runqemu:\n%s" % (cmdline, out))
> -                self.stop_thread()
> -                return False
> +        # Create and hold onto the login semaphore, this will block
> +        # the LoggingThread until we are ready
> +        login_semaphore = threading.Semaphore()
> +        login_semaphore.acquire()
> +
> +        self.thread = LoggingThread(self.log, self.threadsock,
> +                                    self.runqemu.stdout, self.boot_patterns['search_reached_prompt'],
> +                                    self.logger, login_semaphore)
> +
> +        self.thread.start()
> +        login_semaphore.release()
> +
> +        # if not self.thread.connection_established.wait(self.boottime):
> +        #     self.logger.error("Didn't receive a console connection from qemu. "
> +        #                       "Here is the qemu command line used:\n%s\nand "
> +        #                       "output from runqemu:\n%s" % (cmdline, out))
> +        #     self.stop_thread()
> +        #     return False
>  
>          self.logger.debug("Output from runqemu:\n%s", out)
>          self.logger.debug("Waiting at most %d seconds for login banner (%s)" %
>                            (self.boottime, time.strftime("%D %H:%M:%S")))
>          endtime = time.time() + self.boottime
> -        filelist = [self.server_socket, self.runqemu.stdout]
> +        filelist = [self.server_socket]
>          reachedlogin = False
>          stopread = False
>          qemusock = None
> @@ -464,46 +473,19 @@ class QemuRunner:
>                      filelist.remove(self.server_socket)
>                      self.logger.debug("Connection from %s:%s" % addr)
>                  else:
> -                    # try to avoid reading only a single character at a time
> -                    time.sleep(0.1)
> -                    if hasattr(file, 'read'):
> -                        read = file.read(1024)
> -                    elif hasattr(file, 'recv'):
> -                        read = file.recv(1024)
> -                    else:
> -                        self.logger.error('Invalid file type: %s\n%s' % (file))
> -                        read = b''
> -
> -                    self.logger.debug2('Partial boot log:\n%s' % (read.decode('utf-8', errors='backslashreplace')))
> -                    data = data + read
> -                    if data:
> -                        bootlog += data
> -                        self.log(data, extension = ".2")
> -                        data = b''
> -
> -                        if bytes(self.boot_patterns['search_reached_prompt'], 'utf-8') in bootlog:
> -                            self.server_socket.close()
> -                            self.server_socket = qemusock
> -                            stopread = True
> -                            reachedlogin = True
> -                            self.logger.debug("Reached login banner in %.2f seconds (%s)" %
> -                                              (time.time() - (endtime - self.boottime),
> -                                              time.strftime("%D %H:%M:%S")))
> -                    else:
> -                        # no need to check if reachedlogin unless we support multiple connections
> -                        self.logger.debug("QEMU socket disconnected before login banner reached. (%s)" %
> -                                          time.strftime("%D %H:%M:%S"))
> -                        filelist.remove(file)
> -                        file.close()
> +                    if login_semaphore.acquire(blocking=False):
> +                        self.server_socket.close()
> +                        self.server_socket = qemusock
>                          stopread = True
> +                        reachedlogin = True
> +
> +        self.logger.debug("continuing on....")
>  
>          if not reachedlogin:
>              if time.time() >= endtime:
>                  self.logger.warning("Target didn't reach login banner in %d seconds (%s)" %
>                                    (self.boottime, time.strftime("%D %H:%M:%S")))
>              tail = lambda l: "\n".join(l.splitlines()[-25:])
> -            bootlog = self.decode_qemulog(bootlog)
> -            self.logger.warning("Last 25 lines of login console (%d):\n%s" % (len(bootlog), tail(bootlog)))
>              self.logger.warning("Last 25 lines of all logging (%d):\n%s" % (len(self.msg), tail(self.msg)))
>              self.logger.warning("Check full boot log: %s" % self.logfile)
>              self.stop()
> @@ -539,6 +521,7 @@ class QemuRunner:
>                  self.logger.warning("The output:\n%s" % output)
>          except:
>              self.logger.warning("Serial console failed while trying to login")
> +
>          return True
>  
>      def stop(self):
> @@ -696,14 +679,16 @@ class QemuRunner:
>                      status = 1
>          return (status, str(data))
>  
> -# This class is for reading data from a socket and passing it to logfunc
> -# to be processed. It's completely event driven and has a straightforward
> -# event loop. The mechanism for stopping the thread is a simple pipe which
> -# will wake up the poll and allow for tearing everything down.
> +# This class is for reading data from sockets and QEMU's stdio output
> +# and passing it to logfunc to be processed. It's completely event
> +# driven and has a straightforward event loop. The mechanism for
> +# stopping the thread is a simple pipe which will wake up the poll and
> +# allow for tearing everything down.
>  class LoggingThread(threading.Thread):
> -    def __init__(self, logfunc, sock, logger):
> +    def __init__(self, logfunc, sock, qemu_stdio, prompt, logger, semaphore):
>          self.connection_established = threading.Event()
>          self.serversock = sock
> +        self.qemu_stdio = qemu_stdio
>          self.logfunc = logfunc
>          self.logger = logger
>          self.readsock = None
> @@ -713,9 +698,19 @@ class LoggingThread(threading.Thread):
>          self.errorevents = select.POLLERR | select.POLLHUP | select.POLLNVAL
>          self.readevents = select.POLLIN | select.POLLPRI
>  
> +        # tracking until we see prompt
> +        self.prompt = prompt
> +        self.prompt_seen = False
> +        self.boot_log = b''
> +        self.semaphore = semaphore
> +        self.boottime = time.time()
> +
>          threading.Thread.__init__(self, target=self.threadtarget)
>  
>      def threadtarget(self):
> +        # we acquire until we see the boot prompt
> +        self.semaphore.acquire()
> +
>          try:
>              self.eventloop()
>          finally:
> @@ -750,6 +745,7 @@ class LoggingThread(threading.Thread):
>          event_read_mask = self.errorevents | self.readevents
>          poll.register(self.serversock.fileno())
>          poll.register(self.readpipe, event_read_mask)
> +        poll.register(self.qemu_stdio, event_read_mask)
>  
>          breakout = False
>          self.running = True
> @@ -757,8 +753,12 @@ class LoggingThread(threading.Thread):
>          while not breakout:
>              events = poll.poll()
>              for event in events:
> +
> +                self.logger.debug(f"event is {event}")
> +
>                  # An error occurred, bail out
>                  if event[1] & self.errorevents:
> +                    self.logger.debug("error event")
>                      raise Exception(self.stringify_event(event[1]))
>  
>                  # Event to stop the thread
> @@ -767,6 +767,10 @@ class LoggingThread(threading.Thread):
>                      breakout = True
>                      break
>  
> +                # stdio data to be logged
> +                elif self.qemu_stdio.fileno() == event[0]:
> +                    self.consume_qemu_stdio()
> +
>                  # A connection request was received
>                  elif self.serversock.fileno() == event[0]:
>                      self.logger.debug("Connection request received")
> @@ -783,6 +787,30 @@ class LoggingThread(threading.Thread):
>                      data = self.recv(1024)
>                      self.logfunc(data)
>  
> +
> +    # Consume QEMU's stdio output, checking for login strings
> +    def consume_qemu_stdio(self):
> +        # try to avoid reading only a single character at a time
> +        time.sleep(0.1)
> +        read = self.qemu_stdio.read(1024)
> +
> +        # log what we have seen
> +        decoded_data = read.decode('utf-8', errors='backslashreplace')
> +        if self.prompt_seen:
> +            self.logger.debug2('Post login log:\n%s' % decoded_data)
> +        else:
> +            self.logger.debug2('Pre login log:\n%s' % decoded_data)
> +
> +        if not self.prompt_seen and read:
> +            self.boot_log += read
> +
> +            if bytes(self.prompt, 'utf-8') in self.boot_log:
> +                time_to_login = time.time() - self.boottime
> +                self.logger.debug("Reached login banner in %.2f seconds (%s)" %
> +                                  (time_to_login,  time.strftime("%D %H:%M:%S")))
> +                self.semaphore.release()
> +                self.prompt_seen = True
> +
>      # Since the socket is non-blocking make sure to honor EAGAIN
>      # and EWOULDBLOCK.
>      def recv(self, count):


FWIW I tried testing this locally and "bitbake core-image-full-cmdline
-c testimage" just hangs, it can't work out the login prompt.

This code has been the source of a lot of races issues and intermittent
failures which we only "just" got to the bottom of and introducing a
load more isn't something I'm feeling great about. I'll try and find
some time to see if I can work out any better approach.

Cheers,

Richard
Richard Purdie Dec. 18, 2023, 5:39 p.m. UTC | #6
On Mon, 2023-12-18 at 11:23 +0000, Richard Purdie via
lists.openembedded.org wrote:
> On Mon, 2023-12-11 at 15:57 +0000, Alex Bennée wrote:
> > If qemurunner doesn't continuously drain stdout we will eventually
> > cause QEMU to block while trying to write to the pipe. This can
> > manifest itself if the guest has for example configured its serial
> > ports to output via stdio even if the test itself is using a TCP
> > console or SSH to run things.
> > 
> > To do this:
> > 
> >   - always create a logging thread regardless of serial_ports
> >   - use a semaphore between main and logging threads
> >   - move the login detection into the logging thread
> >   - wait until the second acquire before continuing
> > 
> > This doesn't address a potential overflow of stderr although generally
> > stderr from QEMU will be a lot less likely to block due to the volume
> > of data.
> > 
> > Signed-off-by: Alex Bennée <alex.bennee@linaro.org>
> > Cc: Mikko Rapeli <mikko.rapeli@linaro.org>
> > ---
> > 
> 
> "bitbake core-image-full-cmdline
> -c testimage" just hangs, it can't work out the login prompt.
> 
> This code has been the source of a lot of races issues and intermittent
> failures which we only "just" got to the bottom of and introducing a
> load more isn't something I'm feeling great about. I'll try and find
> some time to see if I can work out any better approach.

I've posted an alternative couple of patches to this. Could you see if
they help please?

I realised that we may be having the overflowing buffer problem on the
serial connection as well as stdout.

qemu stderr is being sent to stdout so hopefully there isn't an issue
there.

Cheers,

Richard
diff mbox series

Patch

diff --git a/meta/lib/oeqa/utils/qemurunner.py b/meta/lib/oeqa/utils/qemurunner.py
index 29fe271976..b768c08f04 100644
--- a/meta/lib/oeqa/utils/qemurunner.py
+++ b/meta/lib/oeqa/utils/qemurunner.py
@@ -207,8 +207,7 @@  class QemuRunner:
         self.logger.info("QMP Available for connection at %s" % (qmp_port2))
 
         try:
-            if self.serial_ports >= 2:
-                self.threadsock, threadport = self.create_socket()
+            self.threadsock, threadport = self.create_socket()
             self.server_socket, self.serverport = self.create_socket()
         except socket.error as msg:
             self.logger.error("Failed to create listening socket: %s" % msg[1])
@@ -243,6 +242,7 @@  class QemuRunner:
         # to be a proper fix but this will suffice for now.
         self.runqemu = subprocess.Popen(launch_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, preexec_fn=os.setpgrp, env=env, cwd=self.tmpdir)
         output = self.runqemu.stdout
+        output_drain = output
         launch_time = time.time()
 
         #
@@ -431,21 +431,30 @@  class QemuRunner:
         self.logger.debug("Target IP: %s" % self.ip)
         self.logger.debug("Server IP: %s" % self.server_ip)
 
-        if self.serial_ports >= 2:
-            self.thread = LoggingThread(self.log, self.threadsock, self.logger)
-            self.thread.start()
-            if not self.thread.connection_established.wait(self.boottime):
-                self.logger.error("Didn't receive a console connection from qemu. "
-                             "Here is the qemu command line used:\n%s\nand "
-                             "output from runqemu:\n%s" % (cmdline, out))
-                self.stop_thread()
-                return False
+        # Create and hold onto the login semaphore, this will block
+        # the LoggingThread until we are ready
+        login_semaphore = threading.Semaphore()
+        login_semaphore.acquire()
+
+        self.thread = LoggingThread(self.log, self.threadsock,
+                                    self.runqemu.stdout, self.boot_patterns['search_reached_prompt'],
+                                    self.logger, login_semaphore)
+
+        self.thread.start()
+        login_semaphore.release()
+
+        # if not self.thread.connection_established.wait(self.boottime):
+        #     self.logger.error("Didn't receive a console connection from qemu. "
+        #                       "Here is the qemu command line used:\n%s\nand "
+        #                       "output from runqemu:\n%s" % (cmdline, out))
+        #     self.stop_thread()
+        #     return False
 
         self.logger.debug("Output from runqemu:\n%s", out)
         self.logger.debug("Waiting at most %d seconds for login banner (%s)" %
                           (self.boottime, time.strftime("%D %H:%M:%S")))
         endtime = time.time() + self.boottime
-        filelist = [self.server_socket, self.runqemu.stdout]
+        filelist = [self.server_socket]
         reachedlogin = False
         stopread = False
         qemusock = None
@@ -464,46 +473,19 @@  class QemuRunner:
                     filelist.remove(self.server_socket)
                     self.logger.debug("Connection from %s:%s" % addr)
                 else:
-                    # try to avoid reading only a single character at a time
-                    time.sleep(0.1)
-                    if hasattr(file, 'read'):
-                        read = file.read(1024)
-                    elif hasattr(file, 'recv'):
-                        read = file.recv(1024)
-                    else:
-                        self.logger.error('Invalid file type: %s\n%s' % (file))
-                        read = b''
-
-                    self.logger.debug2('Partial boot log:\n%s' % (read.decode('utf-8', errors='backslashreplace')))
-                    data = data + read
-                    if data:
-                        bootlog += data
-                        self.log(data, extension = ".2")
-                        data = b''
-
-                        if bytes(self.boot_patterns['search_reached_prompt'], 'utf-8') in bootlog:
-                            self.server_socket.close()
-                            self.server_socket = qemusock
-                            stopread = True
-                            reachedlogin = True
-                            self.logger.debug("Reached login banner in %.2f seconds (%s)" %
-                                              (time.time() - (endtime - self.boottime),
-                                              time.strftime("%D %H:%M:%S")))
-                    else:
-                        # no need to check if reachedlogin unless we support multiple connections
-                        self.logger.debug("QEMU socket disconnected before login banner reached. (%s)" %
-                                          time.strftime("%D %H:%M:%S"))
-                        filelist.remove(file)
-                        file.close()
+                    if login_semaphore.acquire(blocking=False):
+                        self.server_socket.close()
+                        self.server_socket = qemusock
                         stopread = True
+                        reachedlogin = True
+
+        self.logger.debug("continuing on....")
 
         if not reachedlogin:
             if time.time() >= endtime:
                 self.logger.warning("Target didn't reach login banner in %d seconds (%s)" %
                                   (self.boottime, time.strftime("%D %H:%M:%S")))
             tail = lambda l: "\n".join(l.splitlines()[-25:])
-            bootlog = self.decode_qemulog(bootlog)
-            self.logger.warning("Last 25 lines of login console (%d):\n%s" % (len(bootlog), tail(bootlog)))
             self.logger.warning("Last 25 lines of all logging (%d):\n%s" % (len(self.msg), tail(self.msg)))
             self.logger.warning("Check full boot log: %s" % self.logfile)
             self.stop()
@@ -539,6 +521,7 @@  class QemuRunner:
                 self.logger.warning("The output:\n%s" % output)
         except:
             self.logger.warning("Serial console failed while trying to login")
+
         return True
 
     def stop(self):
@@ -696,14 +679,16 @@  class QemuRunner:
                     status = 1
         return (status, str(data))
 
-# This class is for reading data from a socket and passing it to logfunc
-# to be processed. It's completely event driven and has a straightforward
-# event loop. The mechanism for stopping the thread is a simple pipe which
-# will wake up the poll and allow for tearing everything down.
+# This class is for reading data from sockets and QEMU's stdio output
+# and passing it to logfunc to be processed. It's completely event
+# driven and has a straightforward event loop. The mechanism for
+# stopping the thread is a simple pipe which will wake up the poll and
+# allow for tearing everything down.
 class LoggingThread(threading.Thread):
-    def __init__(self, logfunc, sock, logger):
+    def __init__(self, logfunc, sock, qemu_stdio, prompt, logger, semaphore):
         self.connection_established = threading.Event()
         self.serversock = sock
+        self.qemu_stdio = qemu_stdio
         self.logfunc = logfunc
         self.logger = logger
         self.readsock = None
@@ -713,9 +698,19 @@  class LoggingThread(threading.Thread):
         self.errorevents = select.POLLERR | select.POLLHUP | select.POLLNVAL
         self.readevents = select.POLLIN | select.POLLPRI
 
+        # tracking until we see prompt
+        self.prompt = prompt
+        self.prompt_seen = False
+        self.boot_log = b''
+        self.semaphore = semaphore
+        self.boottime = time.time()
+
         threading.Thread.__init__(self, target=self.threadtarget)
 
     def threadtarget(self):
+        # we acquire until we see the boot prompt
+        self.semaphore.acquire()
+
         try:
             self.eventloop()
         finally:
@@ -750,6 +745,7 @@  class LoggingThread(threading.Thread):
         event_read_mask = self.errorevents | self.readevents
         poll.register(self.serversock.fileno())
         poll.register(self.readpipe, event_read_mask)
+        poll.register(self.qemu_stdio, event_read_mask)
 
         breakout = False
         self.running = True
@@ -757,8 +753,12 @@  class LoggingThread(threading.Thread):
         while not breakout:
             events = poll.poll()
             for event in events:
+
+                self.logger.debug(f"event is {event}")
+
                 # An error occurred, bail out
                 if event[1] & self.errorevents:
+                    self.logger.debug("error event")
                     raise Exception(self.stringify_event(event[1]))
 
                 # Event to stop the thread
@@ -767,6 +767,10 @@  class LoggingThread(threading.Thread):
                     breakout = True
                     break
 
+                # stdio data to be logged
+                elif self.qemu_stdio.fileno() == event[0]:
+                    self.consume_qemu_stdio()
+
                 # A connection request was received
                 elif self.serversock.fileno() == event[0]:
                     self.logger.debug("Connection request received")
@@ -783,6 +787,30 @@  class LoggingThread(threading.Thread):
                     data = self.recv(1024)
                     self.logfunc(data)
 
+
+    # Consume QEMU's stdio output, checking for login strings
+    def consume_qemu_stdio(self):
+        # try to avoid reading only a single character at a time
+        time.sleep(0.1)
+        read = self.qemu_stdio.read(1024)
+
+        # log what we have seen
+        decoded_data = read.decode('utf-8', errors='backslashreplace')
+        if self.prompt_seen:
+            self.logger.debug2('Post login log:\n%s' % decoded_data)
+        else:
+            self.logger.debug2('Pre login log:\n%s' % decoded_data)
+
+        if not self.prompt_seen and read:
+            self.boot_log += read
+
+            if bytes(self.prompt, 'utf-8') in self.boot_log:
+                time_to_login = time.time() - self.boottime
+                self.logger.debug("Reached login banner in %.2f seconds (%s)" %
+                                  (time_to_login,  time.strftime("%D %H:%M:%S")))
+                self.semaphore.release()
+                self.prompt_seen = True
+
     # Since the socket is non-blocking make sure to honor EAGAIN
     # and EWOULDBLOCK.
     def recv(self, count):