diff mbox series

[bitbake-devel,5/5] siggen: Add parallel query API

Message ID 20240218200743.2982923-6-JPEWhacker@gmail.com
State New
Headers show
Series Implement parallel Query API | expand

Commit Message

Joshua Watt Feb. 18, 2024, 8:07 p.m. UTC
Implements a new API called get_unihashes() that allows for querying
multiple unihashes in parallel.

The API is also reworked to make it easier for derived classes to
interface with the new API in a consistent manner. Instead of overriding
get_unihash() to add custom handling for local hash calculating (e.g.
caches) derived classes should now override get_cached_unihash(), and
return the local unihash or None if there isn't one.

Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
---
 bitbake/lib/bb/siggen.py | 121 ++++++++++++++++++++++++++++-----------
 1 file changed, 87 insertions(+), 34 deletions(-)
diff mbox series

Patch

diff --git a/bitbake/lib/bb/siggen.py b/bitbake/lib/bb/siggen.py
index 58854aee76c..e1a4fa2cdd1 100644
--- a/bitbake/lib/bb/siggen.py
+++ b/bitbake/lib/bb/siggen.py
@@ -102,9 +102,18 @@  class SignatureGenerator(object):
             if flag:
                 self.datacaches[mc].stamp_extrainfo[mcfn][t] = flag
 
+    def get_cached_unihash(self, tid):
+        return None
+
     def get_unihash(self, tid):
+        unihash = self.get_cached_unihash(tid)
+        if unihash:
+            return unihash
         return self.taskhash[tid]
 
+    def get_unihashes(self, tids):
+        return {tid: self.get_unihash(tid) for tid in tids}
+
     def prep_taskhash(self, tid, deps, dataCaches):
         return
 
@@ -524,28 +533,37 @@  class SignatureGeneratorUniHashMixIn(object):
         super().__init__(data)
 
     def get_taskdata(self):
-        return (self.server, self.method, self.extramethod) + super().get_taskdata()
+        return (self.server, self.method, self.extramethod, self.max_parallel) + super().get_taskdata()
 
     def set_taskdata(self, data):
-        self.server, self.method, self.extramethod = data[:3]
-        super().set_taskdata(data[3:])
+        self.server, self.method, self.extramethod, self.max_parallel = data[:4]
+        super().set_taskdata(data[4:])
 
     def client(self):
         if getattr(self, '_client', None) is None:
             self._client = hashserv.create_client(self.server)
         return self._client
 
+    def client_pool(self):
+        if getattr(self, '_client_pool', None) is None:
+            self._client_pool = hashserv.client.ClientPool(self.server, self.max_parallel)
+        return self._client_pool
+
     def reset(self, data):
-        if getattr(self, '_client', None) is not None:
-            self._client.close()
-            self._client = None 
+        self.__close_clients()
         return super().reset(data)
 
     def exit(self):
+        self.__close_clients()
+        return super().exit()
+
+    def __close_clients(self):
         if getattr(self, '_client', None) is not None:
             self._client.close()
             self._client = None
-        return super().exit()
+        if getattr(self, '_client_pool', None) is not None:
+            self._client_pool.close()
+            self._client_pool = None
 
     def get_stampfile_hash(self, tid):
         if tid in self.taskhash:
@@ -578,7 +596,7 @@  class SignatureGeneratorUniHashMixIn(object):
             return None
         return unihash
 
-    def get_unihash(self, tid):
+    def get_cached_unihash(self, tid):
         taskhash = self.taskhash[tid]
 
         # If its not a setscene task we can return
@@ -593,40 +611,74 @@  class SignatureGeneratorUniHashMixIn(object):
             self.unihash[tid] = unihash
             return unihash
 
-        # In the absence of being able to discover a unique hash from the
-        # server, make it be equivalent to the taskhash. The unique "hash" only
-        # really needs to be a unique string (not even necessarily a hash), but
-        # making it match the taskhash has a few advantages:
-        #
-        # 1) All of the sstate code that assumes hashes can be the same
-        # 2) It provides maximal compatibility with builders that don't use
-        #    an equivalency server
-        # 3) The value is easy for multiple independent builders to derive the
-        #    same unique hash from the same input. This means that if the
-        #    independent builders find the same taskhash, but it isn't reported
-        #    to the server, there is a better chance that they will agree on
-        #    the unique hash.
-        unihash = taskhash
+        return None
 
-        try:
-            method = self.method
-            if tid in self.extramethod:
-                method = method + self.extramethod[tid]
-            data = self.client().get_unihash(method, self.taskhash[tid])
-            if data:
-                unihash = data
+    def _get_method(self, tid):
+        method = self.method
+        if tid in self.extramethod:
+            method = method + self.extramethod[tid]
+
+        return method
+
+    def get_unihash(self, tid):
+        return self.get_unihashes([tid])[tid]
+
+    def get_unihashes(self, tids):
+        """
+        For a iterable of tids, returns a dictionary that maps each tid to a
+        unihash
+        """
+        result = {}
+        queries = {}
+        query_result = {}
+
+        for tid in tids:
+            unihash = self.get_cached_unihash(tid)
+            if unihash:
+                result[tid] = unihash
+            else:
+                queries[tid] = (self._get_method(tid), self.taskhash[tid])
+
+        if len(queries) == 0:
+            return result
+
+        if self.max_parallel <= 1 or len(queries) <= 1:
+            # No parallelism required. Make the query serially with the single client
+            for tid, args in queries.items():
+                query_result[tid] = self.client().get_unihash(*args)
+        else:
+            query_result = self.client_pool().get_unihashes(queries)
+
+        for tid, unihash in query_result.items():
+            # In the absence of being able to discover a unique hash from the
+            # server, make it be equivalent to the taskhash. The unique "hash" only
+            # really needs to be a unique string (not even necessarily a hash), but
+            # making it match the taskhash has a few advantages:
+            #
+            # 1) All of the sstate code that assumes hashes can be the same
+            # 2) It provides maximal compatibility with builders that don't use
+            #    an equivalency server
+            # 3) The value is easy for multiple independent builders to derive the
+            #    same unique hash from the same input. This means that if the
+            #    independent builders find the same taskhash, but it isn't reported
+            #    to the server, there is a better chance that they will agree on
+            #    the unique hash.
+            taskhash = self.taskhash[tid]
+            if unihash:
                 # A unique hash equal to the taskhash is not very interesting,
                 # so it is reported it at debug level 2. If they differ, that
                 # is much more interesting, so it is reported at debug level 1
                 hashequiv_logger.bbdebug((1, 2)[unihash == taskhash], 'Found unihash %s in place of %s for %s from %s' % (unihash, taskhash, tid, self.server))
             else:
                 hashequiv_logger.debug2('No reported unihash for %s:%s from %s' % (tid, taskhash, self.server))
-        except ConnectionError as e:
-            bb.warn('Error contacting Hash Equivalence Server %s: %s' % (self.server, str(e)))
+                unihash = taskhash
 
-        self.set_unihash(tid, unihash)
-        self.unihash[tid] = unihash
-        return unihash
+
+            self.set_unihash(tid, unihash)
+            self.unihash[tid] = unihash
+            result[tid] = unihash
+
+        return result
 
     def report_unihash(self, path, task, d):
         import importlib
@@ -754,6 +806,7 @@  class SignatureGeneratorTestEquivHash(SignatureGeneratorUniHashMixIn, SignatureG
         super().init_rundepcheck(data)
         self.server = data.getVar('BB_HASHSERVE')
         self.method = "sstate_output_hash"
+        self.max_parallel = 1
 
 def clean_checksum_file_path(file_checksum_tuple):
     f, cs = file_checksum_tuple