Patchwork [bitbake-devel,4/4] xmlrpc: add support for event-observer-only connection

login
register
mail settings
Submitter Alexandru DAMIAN
Date May 31, 2013, 11:06 a.m.
Message ID <1369998409-16560-4-git-send-email-alexandru.damian@intel.com>
Download mbox | patch
Permalink /patch/50937/
State New
Headers show

Comments

Alexandru DAMIAN - May 31, 2013, 11:06 a.m.
From: Alexandru DAMIAN <alexandru.damian@intel.com>

This patch adds support for multiple UI clients acting only as event syncs.
Summary of changes:

bitbake: adds support for --observe-only command line parameter

xmlrpc server: create a Observer-only connection type, and small changes
to accomodate new incoming parameters

event queue: add exclusive access to structures for multithreaded
operation; this is needed by accepting new UI handlers on a different thread

knotty: support for observer-only connections

Other minor cosmetic changes to support new parameters to functions were made

Based on original patch by Bogdan Marinescu <bogdan.a.marinescu@intel.com>

Signed-off-by: Alexandru DAMIAN <alexandru.damian@intel.com>
---
 bin/bitbake             |  8 ++++++-
 lib/bb/event.py         | 63 ++++++++++++++++++++++++++++++++++---------------
 lib/bb/server/xmlrpc.py | 57 +++++++++++++++++++++++++++++++++++++-------
 lib/bb/ui/knotty.py     | 29 +++++++++++++----------
 lib/bb/ui/uievent.py    |  4 ++--
 5 files changed, 119 insertions(+), 42 deletions(-)
Richard Purdie - June 6, 2013, 12:58 p.m.
On Fri, 2013-05-31 at 12:06 +0100, Alex DAMIAN wrote:
> From: Alexandru DAMIAN <alexandru.damian@intel.com>
> 
> This patch adds support for multiple UI clients acting only as event syncs.
> Summary of changes:
> 
> bitbake: adds support for --observe-only command line parameter
> 
> xmlrpc server: create a Observer-only connection type, and small changes
> to accomodate new incoming parameters
> 
> event queue: add exclusive access to structures for multithreaded
> operation; this is needed by accepting new UI handlers on a different thread
> 
> knotty: support for observer-only connections
> 
> Other minor cosmetic changes to support new parameters to functions were made
> 
> Based on original patch by Bogdan Marinescu <bogdan.a.marinescu@intel.com>
> 
> Signed-off-by: Alexandru DAMIAN <alexandru.damian@intel.com>
> ---
>  bin/bitbake             |  8 ++++++-
>  lib/bb/event.py         | 63 ++++++++++++++++++++++++++++++++++---------------
>  lib/bb/server/xmlrpc.py | 57 +++++++++++++++++++++++++++++++++++++-------
>  lib/bb/ui/knotty.py     | 29 +++++++++++++----------
>  lib/bb/ui/uievent.py    |  4 ++--
>  5 files changed, 119 insertions(+), 42 deletions(-)

I merged the other three patches in the series. I was not happy with the
commit message summary as "XXX: fixes for XXX" tells me nothing. In this
case I rewrote them as examples of what looks better but please think
about those a bit more in future.

For this patch I have some concerns.

> diff --git a/bin/bitbake b/bin/bitbake
> index d263cbd..ef0c5d8 100755
> --- a/bin/bitbake
> +++ b/bin/bitbake
> @@ -197,6 +197,9 @@ class BitBakeConfigParameters(cookerdata.ConfigParameters):
>          parser.add_option("", "--remote-server", help = "Connect to the specified server",
>                     action = "store", dest = "remote_server", default = False)
>  
> +        parser.add_option("", "--observe-only", help = "Connect to a server as an observing-only client",
> +                   action = "store_true", dest = "observe_only", default = False)
> +
>          options, targets = parser.parse_args(sys.argv)
>          return options, targets[1:]
>  
> @@ -269,6 +272,9 @@ def main():
>      if configParams.remote_server and configParams.servertype != "xmlrpc":
>          sys.exit("FATAL: If '--remote-server' is defined, we must set the servertype as 'xmlrpc'.\n")
>  
> +    if configParams.observe_only and (not configParams.remote_server or configParams.bind):
> +        sys.exit("FATAL: '--observe-only' can only be used by UI clients connecting to a server.\n")
> +
>      if "BBDEBUG" in os.environ:
>          level = int(os.environ["BBDEBUG"])
>          if level > configuration.debug:
> @@ -295,7 +301,7 @@ def main():
>          server = start_server(servermodule, configParams, configuration)
>      else:
>          # we start a stub server that is actually a XMLRPClient to
> -        server = servermodule.BitBakeXMLRPCClient()
> +        server = servermodule.BitBakeXMLRPCClient(configParams.observe_only)
>          server.saveConnectionDetails(configParams.remote_server)


Do we really need a commandline option for this or is this something the
UIs can figure out and do for themselves?

>      logger.removeHandler(handler)
> diff --git a/lib/bb/event.py b/lib/bb/event.py
> index 2826e35..726d074 100644
> --- a/lib/bb/event.py
> +++ b/lib/bb/event.py
> @@ -33,12 +33,15 @@ import atexit
>  import traceback
>  import bb.utils
>  import bb.compat
> +import threading

I've already mentioned that mixing mutliprocessing and threading in
bitbake has been known to cause problems. Did you investigate those
issues and do we know this is safe here?

I actually don't even understand why we're locking this. Only the server
process should be able to touching this queue.

>  # This is the pid for which we should generate the event. This is set when
>  # the runqueue forks off.
>  worker_pid = 0
>  worker_pipe = None
>  
> +_ui_handlers_lock = threading.Lock()
> +
>  logger = logging.getLogger('BitBake.Event')
>  
>  class Event(object):
> @@ -93,6 +96,8 @@ def fire_class_handlers(event, d):
>              continue
>  
>  ui_queue = []
> +_ui_event_history = []
> +_ui_event_history_lock = threading.Lock()
>  @atexit.register
>  def print_ui_queue():
>      """If we're exiting before a UI has been spawned, display any queued
> @@ -124,22 +129,34 @@ def fire_ui_handlers(event, d):
>          # No UI handlers registered yet, queue up the messages
>          ui_queue.append(event)
>          return
> -
> +    _ui_event_history_lock.acquire()
> +    _ui_event_history.append(event)
> +    _ui_event_history_lock.release()
>      errors = []
> -    for h in _ui_handlers:
> -        #print "Sending event %s" % event
> -        try:
> -             # We use pickle here since it better handles object instances
> -             # which xmlrpc's marshaller does not. Events *must* be serializable
> -             # by pickle.
> -             if hasattr(_ui_handlers[h].event, "sendpickle"):
> -                _ui_handlers[h].event.sendpickle((pickle.dumps(event)))
> -             else:
> -                _ui_handlers[h].event.send(event)
> -        except:
> -            errors.append(h)
> -    for h in errors:
> -        del _ui_handlers[h]
> +    _ui_handlers_lock.acquire()
> +    try:
> +        for h in _ui_handlers:
> +            #print "Sending event %s" % event
> +            try:
> +                 # We use pickle here since it better handles object instances
> +                 # which xmlrpc's marshaller does not. Events *must* be serializable
> +                 # by pickle.
> +                 if hasattr(_ui_handlers[h].event, "sendpickle"):
> +                    _ui_handlers[h].event.sendpickle((pickle.dumps(event)))
> +                 else:
> +                    _ui_handlers[h].event.send(event)
> +            except:
> +                errors.append(h)
> +        for h in errors:
> +            del _ui_handlers[h]
> +    finally:
> +        _ui_handlers_lock.release()
> +
> +def get_event_history():
> +    _ui_event_history_lock.acquire()
> +    evt_copy = _ui_event_history[:]
> +    _ui_event_history_lock.release()
> +    return evt_copy


I'm afraid I don't like this at all. Why do we need to keep event
history? Surely the UIs are meant to query the server for current state,
not rely on a reply of existing events?

Reading the code I can figure out what you're doing and why and answer
my own questions. The commit message however sucks as it doesn't tell me
any of this. The locking and so on also looks inappropriate and
worrisome.

So this patch needs some further thought/work.

Cheers,

Richard

Patch

diff --git a/bin/bitbake b/bin/bitbake
index d263cbd..ef0c5d8 100755
--- a/bin/bitbake
+++ b/bin/bitbake
@@ -197,6 +197,9 @@  class BitBakeConfigParameters(cookerdata.ConfigParameters):
         parser.add_option("", "--remote-server", help = "Connect to the specified server",
                    action = "store", dest = "remote_server", default = False)
 
+        parser.add_option("", "--observe-only", help = "Connect to a server as an observing-only client",
+                   action = "store_true", dest = "observe_only", default = False)
+
         options, targets = parser.parse_args(sys.argv)
         return options, targets[1:]
 
@@ -269,6 +272,9 @@  def main():
     if configParams.remote_server and configParams.servertype != "xmlrpc":
         sys.exit("FATAL: If '--remote-server' is defined, we must set the servertype as 'xmlrpc'.\n")
 
+    if configParams.observe_only and (not configParams.remote_server or configParams.bind):
+        sys.exit("FATAL: '--observe-only' can only be used by UI clients connecting to a server.\n")
+
     if "BBDEBUG" in os.environ:
         level = int(os.environ["BBDEBUG"])
         if level > configuration.debug:
@@ -295,7 +301,7 @@  def main():
         server = start_server(servermodule, configParams, configuration)
     else:
         # we start a stub server that is actually a XMLRPClient to
-        server = servermodule.BitBakeXMLRPCClient()
+        server = servermodule.BitBakeXMLRPCClient(configParams.observe_only)
         server.saveConnectionDetails(configParams.remote_server)
 
     logger.removeHandler(handler)
diff --git a/lib/bb/event.py b/lib/bb/event.py
index 2826e35..726d074 100644
--- a/lib/bb/event.py
+++ b/lib/bb/event.py
@@ -33,12 +33,15 @@  import atexit
 import traceback
 import bb.utils
 import bb.compat
+import threading
 
 # This is the pid for which we should generate the event. This is set when
 # the runqueue forks off.
 worker_pid = 0
 worker_pipe = None
 
+_ui_handlers_lock = threading.Lock()
+
 logger = logging.getLogger('BitBake.Event')
 
 class Event(object):
@@ -93,6 +96,8 @@  def fire_class_handlers(event, d):
             continue
 
 ui_queue = []
+_ui_event_history = []
+_ui_event_history_lock = threading.Lock()
 @atexit.register
 def print_ui_queue():
     """If we're exiting before a UI has been spawned, display any queued
@@ -124,22 +129,34 @@  def fire_ui_handlers(event, d):
         # No UI handlers registered yet, queue up the messages
         ui_queue.append(event)
         return
-
+    _ui_event_history_lock.acquire()
+    _ui_event_history.append(event)
+    _ui_event_history_lock.release()
     errors = []
-    for h in _ui_handlers:
-        #print "Sending event %s" % event
-        try:
-             # We use pickle here since it better handles object instances
-             # which xmlrpc's marshaller does not. Events *must* be serializable
-             # by pickle.
-             if hasattr(_ui_handlers[h].event, "sendpickle"):
-                _ui_handlers[h].event.sendpickle((pickle.dumps(event)))
-             else:
-                _ui_handlers[h].event.send(event)
-        except:
-            errors.append(h)
-    for h in errors:
-        del _ui_handlers[h]
+    _ui_handlers_lock.acquire()
+    try:
+        for h in _ui_handlers:
+            #print "Sending event %s" % event
+            try:
+                 # We use pickle here since it better handles object instances
+                 # which xmlrpc's marshaller does not. Events *must* be serializable
+                 # by pickle.
+                 if hasattr(_ui_handlers[h].event, "sendpickle"):
+                    _ui_handlers[h].event.sendpickle((pickle.dumps(event)))
+                 else:
+                    _ui_handlers[h].event.send(event)
+            except:
+                errors.append(h)
+        for h in errors:
+            del _ui_handlers[h]
+    finally:
+        _ui_handlers_lock.release()
+
+def get_event_history():
+    _ui_event_history_lock.acquire()
+    evt_copy = _ui_event_history[:]
+    _ui_event_history_lock.release()
+    return evt_copy
 
 def fire(event, d):
     """Fire off an Event"""
@@ -199,13 +216,21 @@  def remove(name, handler):
     _handlers.pop(name)
 
 def register_UIHhandler(handler):
-    bb.event._ui_handler_seq = bb.event._ui_handler_seq + 1
-    _ui_handlers[_ui_handler_seq] = handler
+    _ui_handlers_lock.acquire()
+    try:
+        bb.event._ui_handler_seq = bb.event._ui_handler_seq + 1
+        _ui_handlers[_ui_handler_seq] = handler
+    finally:
+        _ui_handlers_lock.release()
     return _ui_handler_seq
 
 def unregister_UIHhandler(handlerNum):
-    if handlerNum in _ui_handlers:
-        del _ui_handlers[handlerNum]
+    _ui_handlers_lock.acquire()
+    try:
+        if handlerNum in _ui_handlers:
+            del _ui_handlers[handlerNum]
+    finally:
+        _ui_handlers_lock.release()
     return
 
 def getName(e):
diff --git a/lib/bb/server/xmlrpc.py b/lib/bb/server/xmlrpc.py
index 0b51ebd..0178bef 100644
--- a/lib/bb/server/xmlrpc.py
+++ b/lib/bb/server/xmlrpc.py
@@ -157,7 +157,7 @@  class BitBakeServerCommands():
         self.server = server
         self.has_client = False
 
-    def registerEventHandler(self, host, port):
+    def registerEventHandler(self, host, port, replay = False):
         """
         Register a remote UI Event Handler
         """
@@ -255,17 +255,25 @@  class BitBakeUIEventServer(threading.Thread):
             self.qlock.release()
             return e
 
-    def __init__(self, connection):
+    def __init__(self, connection, replay):
         self.connection = connection
         self.notify = threading.Event()
         self.event = BitBakeUIEventServer.EventAdapter(self.notify)
         self.quit = False
+        self.replay = replay
         threading.Thread.__init__(self)
 
     def terminateServer(self):
         self.quit = True
 
     def run(self):
+        # First send all events in the event history if requested
+        # by the client
+        if self.replay:
+           event_history = bb.event.get_event_history()
+           for evt in event_history:
+               self.connection.event.sendpickle(pickle.dumps(evt))
+           del event_history
         while not self.quit:
             self.notify.wait(0.1)
             evt = self.event.get()
@@ -278,14 +286,14 @@  class BitBakeXMLRPCEventServerController(SimpleXMLRPCServer, threading.Thread):
         threading.Thread.__init__(self)
         self.register_function(self.registerEventHandler, "registerEventHandler")
         self.register_function(self.unregisterEventHandler, "unregisterEventHandler")
-        self.register_function(self.terminateServer, "terminateServer")
+        #self.register_function(self.terminateServer, "terminateServer")
         #self.register_function(self.runCommand, "runCommand")
         self.quit = False
         self.clients = {}
         self.client_ui_ids = {}
         self.timeout = 1    # timeout for .handle_request()
 
-    def registerEventHandler(self, host, port):
+    def registerEventHandler(self, host, port, replay = False):
         """
         Register a remote UI Event Handler
         """
@@ -293,7 +301,7 @@  class BitBakeXMLRPCEventServerController(SimpleXMLRPCServer, threading.Thread):
         client_hash = "%s:%d" % (host, port)
         if self.clients.has_key(client_hash):
             return None
-        client_ui_server = BitBakeUIEventServer(connection)
+        client_ui_server = BitBakeUIEventServer(connection, replay)
         self.client_ui_ids[client_hash] = bb.event.register_UIHhandler(client_ui_server)
         client_ui_server.start()
         self.clients[client_hash] = client_ui_server
@@ -423,6 +431,33 @@  class XMLRPCServer(SimpleXMLRPCServer, BaseImplServer):
     def set_connection_token(self, token):
         self.connection_token = token
 
+
+class BitBakeObserverConnection(BitBakeBaseServerConnection):
+    def __init__(self, serverImpl, clientinfo , replay):
+        self.connection = xmlrpclib.ServerProxy("http://%s:%d/" % (serverImpl.host, serverImpl.port + 2), allow_none=True)
+        self.clientinfo = clientinfo
+        self.replay = replay
+
+    def connect(self):
+        self.events = uievent.BBUIEventQueue(self.connection, self.clientinfo, self.replay)
+        return self
+
+    def removeClient(self):
+        pass
+
+    def terminate(self):
+        # Don't wait for server indefinitely
+        import socket
+        socket.setdefaulttimeout(2)
+        try:
+            self.events.system_quit()
+        except:
+            pass
+        try:
+            self.connection.terminateServer()
+        except:
+            pass
+
 class BitBakeXMLRPCServerConnection(BitBakeBaseServerConnection):
     def __init__(self, serverImpl, clientinfo=("localhost", 0)):
         self.connection, self.transport = _create_server(serverImpl.host, serverImpl.port)
@@ -471,9 +506,12 @@  class BitBakeServer(BitBakeBaseServer):
         self.connection.transport.set_connection_token(token)
 
 class BitBakeXMLRPCClient(BitBakeBaseServer):
+    """ a BitBakeServer controller that just connects to a remote server
 
-    def __init__(self):
-        pass
+    """
+    def __init__(self, observer_only = False, replay = False):
+        self.observer_only = observer_only
+        self.replay = replay
 
     def saveConnectionDetails(self, remote):
         self.remote = remote
@@ -495,7 +533,10 @@  class BitBakeXMLRPCClient(BitBakeBaseServer):
         except:
             return None
         self.serverImpl = XMLRPCProxyServer(host, port)
-        self.connection = BitBakeXMLRPCServerConnection(self.serverImpl, (ip, 0))
+        if self.observer_only:
+            self.connection = BitBakeObserverConnection(self.serverImpl, (ip, 0), self.replay)
+        else:
+            self.connection = BitBakeXMLRPCServerConnection(self.serverImpl, (ip, 0))
         return self.connection.connect()
 
     def endSession(self):
diff --git a/lib/bb/ui/knotty.py b/lib/bb/ui/knotty.py
index 389c3cc..465203f 100644
--- a/lib/bb/ui/knotty.py
+++ b/lib/bb/ui/knotty.py
@@ -216,21 +216,28 @@  class TerminalFilter(object):
             fd = sys.stdin.fileno()
             self.termios.tcsetattr(fd, self.termios.TCSADRAIN, self.stdinbackup)
 
-def main(server, eventHandler, params, tf = TerminalFilter):
-
+def _log_settings_from_server(server):
     # Get values of variables which control our output
     includelogs, error = server.runCommand(["getVariable", "BBINCLUDELOGS"])
     if error:
         logger.error("Unable to get the value of BBINCLUDELOGS variable: %s" % error)
-        return 1
+        raise error
     loglines, error = server.runCommand(["getVariable", "BBINCLUDELOGS_LINES"])
     if error:
         logger.error("Unable to get the value of BBINCLUDELOGS_LINES variable: %s" % error)
-        return 1
+        raise error
     consolelogfile, error = server.runCommand(["getVariable", "BB_CONSOLELOG"])
     if error:
         logger.error("Unable to get the value of BB_CONSOLELOG variable: %s" % error)
-        return 1
+        raise error
+    return includelogs, loglines, consolelogfile
+
+def main(server, eventHandler, params, tf = TerminalFilter):
+
+    if params.observe_only:
+        includelogs, loglines, consolelogfile = None, None, None
+    else:
+        includelogs, loglines, consolelogfile = _log_settings_from_server(server)
 
     if sys.stdin.isatty() and sys.stdout.isatty():
         log_exec_tty = True
@@ -254,7 +261,7 @@  def main(server, eventHandler, params, tf = TerminalFilter):
         consolelog.setFormatter(conlogformat)
         logger.addHandler(consolelog)
 
-    try:
+    if not params.observe_only:
         params.updateFromServer(server)
         cmdline = params.parseActions()
         if not cmdline:
@@ -271,9 +278,7 @@  def main(server, eventHandler, params, tf = TerminalFilter):
         elif ret != True:
             logger.error("Command '%s' failed: returned %s" % (cmdline, ret))
             return 1
-    except xmlrpclib.Fault as x:
-        logger.error("XMLRPC Fault getting commandline:\n %s" % x)
-        return 1
+
 
     parseprogress = None
     cacheprogress = None
@@ -320,7 +325,7 @@  def main(server, eventHandler, params, tf = TerminalFilter):
                 elif event.levelno == format.WARNING:
                     warnings = warnings + 1
                 # For "normal" logging conditions, don't show note logs from tasks
-                # but do show them if the user has changed the default log level to 
+                # but do show them if the user has changed the default log level to
                 # include verbose/debug messages
                 if event.taskpid != 0 and event.levelno <= format.NOTE:
                     continue
@@ -469,12 +474,12 @@  def main(server, eventHandler, params, tf = TerminalFilter):
                 pass
         except KeyboardInterrupt:
             termfilter.clearFooter()
-            if main.shutdown == 1:
+            if not params.observe_only and main.shutdown == 1:
                 print("\nSecond Keyboard Interrupt, stopping...\n")
                 _, error = server.runCommand(["stateStop"])
                 if error:
                     logger.error("Unable to cleanly stop: %s" % error)
-            if main.shutdown == 0:
+            if not params.observe_only and main.shutdown == 0:
                 print("\nKeyboard Interrupt, closing down...\n")
                 interrupted = True
                 _, error = server.runCommand(["stateShutdown"])
diff --git a/lib/bb/ui/uievent.py b/lib/bb/ui/uievent.py
index 0b9a836..53a5f63 100644
--- a/lib/bb/ui/uievent.py
+++ b/lib/bb/ui/uievent.py
@@ -28,7 +28,7 @@  import socket, threading, pickle
 from SimpleXMLRPCServer import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
 
 class BBUIEventQueue:
-    def __init__(self, BBServer, clientinfo=("localhost, 0")):
+    def __init__(self, BBServer, clientinfo=("localhost, 0"), replay = False):
 
         self.eventQueue = []
         self.eventQueueLock = threading.Lock()
@@ -44,7 +44,7 @@  class BBUIEventQueue:
         server.register_function( self.send_event, "event.sendpickle" )
         server.socket.settimeout(1)
 
-        self.EventHandle = self.BBServer.registerEventHandler(self.host, self.port)
+        self.EventHandle = self.BBServer.registerEventHandler(self.host, self.port, replay)
 
         self.server = server