[RFC,1/2] sstate: use the python3 ThreadPoolExecutor instead of the OE ThreadedPool

Message ID 20220416202401.179351-1-quaresma.jose@gmail.com
State New
Headers show
Series [RFC,1/2] sstate: use the python3 ThreadPoolExecutor instead of the OE ThreadedPool | expand

Commit Message

Jose Quaresma April 16, 2022, 8:24 p.m. UTC
for the FetchConnectionCache use a queue where each thread can
get an unsed connection_cache that is properly initialized before
we fireup the ThreadPoolExecutor.

for the progress bar we need an adictional task counter
that is protected with thread lock as it runs inside the
ThreadPoolExecutor.

Fixes [YOCTO #14775] -- https://bugzilla.yoctoproject.org/show_bug.cgi?id=14775

Signed-off-by: Jose Quaresma <quaresma.jose@gmail.com>
---
 meta/classes/sstate.bbclass | 44 +++++++++++++++++++++++--------------
 1 file changed, 28 insertions(+), 16 deletions(-)

Comments

Richard Purdie April 16, 2022, 9:57 p.m. UTC | #1
On Sat, 2022-04-16 at 21:24 +0100, Jose Quaresma wrote:
> for the FetchConnectionCache use a queue where each thread can
> get an unsed connection_cache that is properly initialized before
> we fireup the ThreadPoolExecutor.
> 
> for the progress bar we need an adictional task counter
> that is protected with thread lock as it runs inside the
> ThreadPoolExecutor.
> 
> Fixes [YOCTO #14775] -- https://bugzilla.yoctoproject.org/show_bug.cgi?id=14775
> 
> Signed-off-by: Jose Quaresma <quaresma.jose@gmail.com>
> ---
>  meta/classes/sstate.bbclass | 44 +++++++++++++++++++++++--------------
>  1 file changed, 28 insertions(+), 16 deletions(-)

Are there specific issues you see with oe.utils.ThreadedPool that this change
addresses? Were you able to reproduce the issue in 14775?

I'm a little concerned we swap one implementation where we know roughly what the
issues are for another where we dont :/.

I notice that ThreadPoolExecutor can take an initializer but you're doing this
using the queue instead. Is that because you suspect some issue with those being
setup in the separate threads?

You also mentioned the debug messages not showing. That suggests something is
wrong with the event handlers in the new threading model and that errors
wouldn't propagate either so we need to check into that.

This is definitely an interesting idea but I'm nervous about it :/.

Cheers,

Richard



> diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass
> index 1c0cae4893..0ede078770 100644
> --- a/meta/classes/sstate.bbclass
> +++ b/meta/classes/sstate.bbclass
> @@ -977,15 +977,22 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
>              localdata.delVar('BB_NO_NETWORK')
>  
>          from bb.fetch2 import FetchConnectionCache
> -        def checkstatus_init(thread_worker):
> -            thread_worker.connection_cache = FetchConnectionCache()
> -
> -        def checkstatus_end(thread_worker):
> -            thread_worker.connection_cache.close_connections()
> -
> -        def checkstatus(thread_worker, arg):
> +        def checkstatus_init():
> +            while not connection_cache_pool.full():
> +                connection_cache_pool.put(FetchConnectionCache())
> +
> +        def checkstatus_end():
> +            while not connection_cache_pool.empty():
> +                connection_cache = connection_cache_pool.get()
> +                connection_cache.close_connections()
> +
> +        import threading
> +        _lock = threading.Lock()
> +        def checkstatus(arg):
>              (tid, sstatefile) = arg
>  
> +            connection_cache = connection_cache_pool.get()
> +
>              localdata2 = bb.data.createCopy(localdata)
>              srcuri = "file://" + sstatefile
>              localdata2.setVar('SRC_URI', srcuri)
> @@ -995,7 +1002,7 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
>  
>              try:
>                  fetcher = bb.fetch2.Fetch(srcuri.split(), localdata2,
> -                            connection_cache=thread_worker.connection_cache)
> +                            connection_cache=connection_cache)
>                  fetcher.checkstatus()
>                  bb.debug(2, "SState: Successful fetch test for %s" % srcuri)
>                  found.add(tid)
> @@ -1005,8 +1012,12 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
>              except Exception as e:
>                  bb.error("SState: cannot test %s: %s\n%s" % (srcuri, repr(e), traceback.format_exc()))
>  
> +            connection_cache_pool.put(connection_cache)
> +
>              if progress:
> -                bb.event.fire(bb.event.ProcessProgress(msg, len(tasklist) - thread_worker.tasks.qsize()), d)
> +                with _lock:
> +                    tasks -= 1
> +                bb.event.fire(bb.event.ProcessProgress(msg, len(tasklist) - tasks), d)
>  
>          tasklist = []
>          for tid in missed:
> @@ -1016,6 +1027,7 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
>          if tasklist:
>              nproc = min(int(d.getVar("BB_NUMBER_THREADS")), len(tasklist))
>  
> +            tasks = len(tasklist)
>              progress = len(tasklist) >= 100
>              if progress:
>                  msg = "Checking sstate mirror object availability"
> @@ -1025,13 +1037,13 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
>              fetcherenv = bb.fetch2.get_fetcher_environment(d)
>              with bb.utils.environment(**fetcherenv):
>                  bb.event.enable_threadlock()
> -                pool = oe.utils.ThreadedPool(nproc, len(tasklist),
> -                        worker_init=checkstatus_init, worker_end=checkstatus_end,
> -                        name="sstate_checkhashes-")
> -                for t in tasklist:
> -                    pool.add_task(checkstatus, t)
> -                pool.start()
> -                pool.wait_completion()
> +                import concurrent.futures
> +                from queue import Queue
> +                connection_cache_pool = Queue(nproc)
> +                checkstatus_init()
> +                with concurrent.futures.ThreadPoolExecutor(max_workers=nproc) as executor:
> +                    executor.map(checkstatus, tasklist)
> +                checkstatus_end()
>                  bb.event.disable_threadlock()
>  
>              if progress:
> -=-=-=-=-=-=-=-=-=-=-=-
> Links: You receive all messages sent to this group.
> View/Reply Online (#164558): https://lists.openembedded.org/g/openembedded-core/message/164558
> Mute This Topic: https://lists.openembedded.org/mt/90512350/1686473
> Group Owner: openembedded-core+owner@lists.openembedded.org
> Unsubscribe: https://lists.openembedded.org/g/openembedded-core/unsub [richard.purdie@linuxfoundation.org]
> -=-=-=-=-=-=-=-=-=-=-=-
>
Jose Quaresma April 16, 2022, 10:27 p.m. UTC | #2
Richard Purdie <richard.purdie@linuxfoundation.org> escreveu no dia sábado,
16/04/2022 à(s) 22:57:

> On Sat, 2022-04-16 at 21:24 +0100, Jose Quaresma wrote:
> > for the FetchConnectionCache use a queue where each thread can
> > get an unsed connection_cache that is properly initialized before
> > we fireup the ThreadPoolExecutor.
> >
> > for the progress bar we need an adictional task counter
> > that is protected with thread lock as it runs inside the
> > ThreadPoolExecutor.
> >
> > Fixes [YOCTO #14775] --
> https://bugzilla.yoctoproject.org/show_bug.cgi?id=14775
> >
> > Signed-off-by: Jose Quaresma <quaresma.jose@gmail.com>
> > ---
> >  meta/classes/sstate.bbclass | 44 +++++++++++++++++++++++--------------
> >  1 file changed, 28 insertions(+), 16 deletions(-)
>
> Are there specific issues you see with oe.utils.ThreadedPool that this
> change
> addresses? Were you able to reproduce the issue in 14775?
>

Looking deeper while testing the patch I think I found another bug in the
sstate mirror handling.
The python set() is not thread safe and we use it inside the thread pool, I
added a new python set class for that in my V2

I don't know if it is related to 14775 but it can be, I can't reproduce the
14775 on my side,
maybe it's better to remove the 14775 mention from my commits, what do you
think?


> I'm a little concerned we swap one implementation where we know roughly
> what the
> issues are for another where we dont :/.
>

I think there some issues on ThreadedPool in the worker_init and worker_end,
this functions is called in all workers and it seems to me that the
right thing to do
is calling and reuse the previous ones connection_cache otherwise the
connection_cache
does nothing.


>
> I notice that ThreadPoolExecutor can take an initializer but you're doing
> this
> using the queue instead. Is that because you suspect some issue with those
> being
> setup in the separate threads?
>

I am using a queue as it is the easy way I find for reusing the
FetchConnectionCache.


>
> You also mentioned the debug messages not showing. That suggests something
> is
> wrong with the event handlers in the new threading model and that errors
> wouldn't propagate either so we need to check into that.
>

This is fixed in V2


>
> This is definitely an interesting idea but I'm nervous about it :/.
>

It would be interesting if you could test it in the autobuilder.
On my side it is working well now, I will send a V2

Jose


>
> Cheers,
>
> Richard
>
>
>
> > diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass
> > index 1c0cae4893..0ede078770 100644
> > --- a/meta/classes/sstate.bbclass
> > +++ b/meta/classes/sstate.bbclass
> > @@ -977,15 +977,22 @@ def sstate_checkhashes(sq_data, d, siginfo=False,
> currentcount=0, summary=True,
> >              localdata.delVar('BB_NO_NETWORK')
> >
> >          from bb.fetch2 import FetchConnectionCache
> > -        def checkstatus_init(thread_worker):
> > -            thread_worker.connection_cache = FetchConnectionCache()
> > -
> > -        def checkstatus_end(thread_worker):
> > -            thread_worker.connection_cache.close_connections()
> > -
> > -        def checkstatus(thread_worker, arg):
> > +        def checkstatus_init():
> > +            while not connection_cache_pool.full():
> > +                connection_cache_pool.put(FetchConnectionCache())
> > +
> > +        def checkstatus_end():
> > +            while not connection_cache_pool.empty():
> > +                connection_cache = connection_cache_pool.get()
> > +                connection_cache.close_connections()
> > +
> > +        import threading
> > +        _lock = threading.Lock()
> > +        def checkstatus(arg):
> >              (tid, sstatefile) = arg
> >
> > +            connection_cache = connection_cache_pool.get()
> > +
> >              localdata2 = bb.data.createCopy(localdata)
> >              srcuri = "file://" + sstatefile
> >              localdata2.setVar('SRC_URI', srcuri)
> > @@ -995,7 +1002,7 @@ def sstate_checkhashes(sq_data, d, siginfo=False,
> currentcount=0, summary=True,
> >
> >              try:
> >                  fetcher = bb.fetch2.Fetch(srcuri.split(), localdata2,
> > -
> connection_cache=thread_worker.connection_cache)
> > +                            connection_cache=connection_cache)
> >                  fetcher.checkstatus()
> >                  bb.debug(2, "SState: Successful fetch test for %s" %
> srcuri)
> >                  found.add(tid)
> > @@ -1005,8 +1012,12 @@ def sstate_checkhashes(sq_data, d, siginfo=False,
> currentcount=0, summary=True,
> >              except Exception as e:
> >                  bb.error("SState: cannot test %s: %s\n%s" % (srcuri,
> repr(e), traceback.format_exc()))
> >
> > +            connection_cache_pool.put(connection_cache)
> > +
> >              if progress:
> > -                bb.event.fire(bb.event.ProcessProgress(msg,
> len(tasklist) - thread_worker.tasks.qsize()), d)
> > +                with _lock:
> > +                    tasks -= 1
> > +                bb.event.fire(bb.event.ProcessProgress(msg,
> len(tasklist) - tasks), d)
> >
> >          tasklist = []
> >          for tid in missed:
> > @@ -1016,6 +1027,7 @@ def sstate_checkhashes(sq_data, d, siginfo=False,
> currentcount=0, summary=True,
> >          if tasklist:
> >              nproc = min(int(d.getVar("BB_NUMBER_THREADS")),
> len(tasklist))
> >
> > +            tasks = len(tasklist)
> >              progress = len(tasklist) >= 100
> >              if progress:
> >                  msg = "Checking sstate mirror object availability"
> > @@ -1025,13 +1037,13 @@ def sstate_checkhashes(sq_data, d,
> siginfo=False, currentcount=0, summary=True,
> >              fetcherenv = bb.fetch2.get_fetcher_environment(d)
> >              with bb.utils.environment(**fetcherenv):
> >                  bb.event.enable_threadlock()
> > -                pool = oe.utils.ThreadedPool(nproc, len(tasklist),
> > -                        worker_init=checkstatus_init,
> worker_end=checkstatus_end,
> > -                        name="sstate_checkhashes-")
> > -                for t in tasklist:
> > -                    pool.add_task(checkstatus, t)
> > -                pool.start()
> > -                pool.wait_completion()
> > +                import concurrent.futures
> > +                from queue import Queue
> > +                connection_cache_pool = Queue(nproc)
> > +                checkstatus_init()
> > +                with
> concurrent.futures.ThreadPoolExecutor(max_workers=nproc) as executor:
> > +                    executor.map(checkstatus, tasklist)
> > +                checkstatus_end()
> >                  bb.event.disable_threadlock()
> >
> >              if progress:
> > -=-=-=-=-=-=-=-=-=-=-=-
> > Links: You receive all messages sent to this group.
> > View/Reply Online (#164558):
> https://lists.openembedded.org/g/openembedded-core/message/164558
> > Mute This Topic: https://lists.openembedded.org/mt/90512350/1686473
> > Group Owner: openembedded-core+owner@lists.openembedded.org
> > Unsubscribe: https://lists.openembedded.org/g/openembedded-core/unsub [
> richard.purdie@linuxfoundation.org]
> > -=-=-=-=-=-=-=-=-=-=-=-
> >
>
>
>
Richard Purdie April 17, 2022, 7:57 a.m. UTC | #3
On Sat, 2022-04-16 at 23:27 +0100, Jose Quaresma wrote:
> 
> 
> Richard Purdie <richard.purdie@linuxfoundation.org> escreveu no dia sábado,
> 16/04/2022 à(s) 22:57:
> > On Sat, 2022-04-16 at 21:24 +0100, Jose Quaresma wrote:
> > > for the FetchConnectionCache use a queue where each thread can
> > > get an unsed connection_cache that is properly initialized before
> > > we fireup the ThreadPoolExecutor.
> > > 
> > > for the progress bar we need an adictional task counter
> > > that is protected with thread lock as it runs inside the
> > > ThreadPoolExecutor.
> > > 
> > > Fixes [YOCTO #14775] --
> > https://bugzilla.yoctoproject.org/show_bug.cgi?id=14775
> > > 
> > > Signed-off-by: Jose Quaresma <quaresma.jose@gmail.com>
> > > ---
> > >   meta/classes/sstate.bbclass | 44 +++++++++++++++++++++++--------------
> > >   1 file changed, 28 insertions(+), 16 deletions(-)
> > 
> > Are there specific issues you see with oe.utils.ThreadedPool that this
> > change
> > addresses? Were you able to reproduce the issue in 14775?
> > 
> 
> 
> Looking deeper while testing the patch I think I found another bug in the
> sstate mirror handling.
> The python set() is not thread safe and we use it inside the thread pool, I
> added a new python set class for that in my V2 

That might explain things.

> I don't know if it is related to 14775 but it can be, I can't reproduce the
> 14775 on my side, maybe it's better to remove the 14775 mention from my
> commits, what do you think?

I think you shouldn't say it fixes it as we simply don't know that. It may be
related to so perhaps say that instead.

> > I'm a little concerned we swap one implementation where we know roughly what
> > the issues are for another where we dont :/.
> > 
> 
> I think there some issues on ThreadedPool in the worker_init and worker_end,
> this functions is called in all workers and it seems to me that the
> right thing to do is calling and reuse the previous ones connection_cache
> otherwise the connection_cache does nothing.

It creates a connection_cache in each thread that is created. Once created in a
given thread, that connection cache is reused there? I'm not sure you can say it
does nothing?

>  
> > 
> > I notice that ThreadPoolExecutor can take an initializer but you're doing
> > this
> > using the queue instead. Is that because you suspect some issue with those
> > being
> > setup in the separate threads?
> > 
> 
> 
> I am using a queue as it is the easy way I find for reusing the
> FetchConnectionCache.

I think that piece of code was working already?

>  
> > 
> > You also mentioned the debug messages not showing. That suggests something
> > is
> > wrong with the event handlers in the new threading model and that errors
> > wouldn't propagate either so we need to check into that.
> > 
> 
> 
> This is fixed in V2


What was the issue out of interest?

>  
> > 
> > This is definitely an interesting idea but I'm nervous about it :/.
> > 
> 
> 
> It would be interesting if you could test it in the autobuilder.
> On my side it is working well now, I will send a V2

The challenge with autobuilder testing of this is that there is only a small
portion of the autobuilder tests which exercise this code (testsdkext for
images). The current issues only occur intermittently so it is hard to know if
any given change fixes anything (or introduces a new race).

One more interesting test which may more quickly find issues would be to make
everything use the http mirror on the autobuilder I guess. We'd need to figure
out the configuration for that though.

Cheers,

Richard
Jose Quaresma April 18, 2022, 5:52 a.m. UTC | #4
Richard Purdie <richard.purdie@linuxfoundation.org> escreveu no dia
domingo, 17/04/2022 à(s) 08:57:

> On Sat, 2022-04-16 at 23:27 +0100, Jose Quaresma wrote:
> >
> >
> > Richard Purdie <richard.purdie@linuxfoundation.org> escreveu no dia
> sábado,
> > 16/04/2022 à(s) 22:57:
> > > On Sat, 2022-04-16 at 21:24 +0100, Jose Quaresma wrote:
> > > > for the FetchConnectionCache use a queue where each thread can
> > > > get an unsed connection_cache that is properly initialized before
> > > > we fireup the ThreadPoolExecutor.
> > > >
> > > > for the progress bar we need an adictional task counter
> > > > that is protected with thread lock as it runs inside the
> > > > ThreadPoolExecutor.
> > > >
> > > > Fixes [YOCTO #14775] --
> > > https://bugzilla.yoctoproject.org/show_bug.cgi?id=14775
> > > >
> > > > Signed-off-by: Jose Quaresma <quaresma.jose@gmail.com>
> > > > ---
> > > >   meta/classes/sstate.bbclass | 44
> +++++++++++++++++++++++--------------
> > > >   1 file changed, 28 insertions(+), 16 deletions(-)
> > >
> > > Are there specific issues you see with oe.utils.ThreadedPool that this
> > > change
> > > addresses? Were you able to reproduce the issue in 14775?
> > >
> >
> >
> > Looking deeper while testing the patch I think I found another bug in the
> > sstate mirror handling.
> > The python set() is not thread safe and we use it inside the thread
> pool, I
> > added a new python set class for that in my V2
>
> That might explain things.
>
> > I don't know if it is related to 14775 but it can be, I can't reproduce
> the
> > 14775 on my side, maybe it's better to remove the 14775 mention from my
> > commits, what do you think?
>
> I think you shouldn't say it fixes it as we simply don't know that. It may
> be
> related to so perhaps say that instead.
>

I will do that.


>
> > > I'm a little concerned we swap one implementation where we know
> roughly what
> > > the issues are for another where we dont :/.
> > >
> >
> > I think there some issues on ThreadedPool in the worker_init and
> worker_end,
> > this functions is called in all workers and it seems to me that the
> > right thing to do is calling and reuse the previous ones connection_cache
> > otherwise the connection_cache does nothing.
>
> It creates a connection_cache in each thread that is created. Once created
> in a
> given thread, that connection cache is reused there? I'm not sure you can
> say it
> does nothing?
>

I may have misunderstood this part and you may be right. I have to
re-analyze more carefully.


>
> >
> > >
> > > I notice that ThreadPoolExecutor can take an initializer but you're
> doing
> > > this
> > > using the queue instead. Is that because you suspect some issue with
> those
> > > being
> > > setup in the separate threads?
> > >
> >
> >
> > I am using a queue as it is the easy way I find for reusing the
> > FetchConnectionCache.
>
> I think that piece of code was working already?
>

it may be working fine on OE ThreadPool and I misunderstood that


>
> >
> > >
> > > You also mentioned the debug messages not showing. That suggests
> something
> > > is
> > > wrong with the event handlers in the new threading model and that
> errors
> > > wouldn't propagate either so we need to check into that.
> > >
> >
> >
> > This is fixed in V2
>
>
> What was the issue out of interest?
>

I don't know but it starts working when I add the thread safe collections.


>
> >
> > >
> > > This is definitely an interesting idea but I'm nervous about it :/.
> > >
> >
> >
> > It would be interesting if you could test it in the autobuilder.
> > On my side it is working well now, I will send a V2
>
> The challenge with autobuilder testing of this is that there is only a
> small
> portion of the autobuilder tests which exercise this code (testsdkext for
> images). The current issues only occur intermittently so it is hard to
> know if
> any given change fixes anything (or introduces a new race).
>
> One more interesting test which may more quickly find issues would be to
> make
> everything use the http mirror on the autobuilder I guess. We'd need to
> figure
> out the configuration for that though.
>

What you mean by this is that there are builds on the autobuilder that uses
the sstate mirror
and others that use some shared sstate cache filesystem?

As you previously said and as the sstate mirror is available for the
community,
I think and I will try to add some tests for that.
I still don't know how to do it but I'll think about it.

Another thing about this RFC series is that I think I need to do it in a way
that it can be backported for dunfell if we need to do that.

I will spend more time on this during this week.
Thanks for your always valuable comments.

Jose


> Cheers,
>
> Richard
>
>
>
>

Patch

diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass
index 1c0cae4893..0ede078770 100644
--- a/meta/classes/sstate.bbclass
+++ b/meta/classes/sstate.bbclass
@@ -977,15 +977,22 @@  def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
             localdata.delVar('BB_NO_NETWORK')
 
         from bb.fetch2 import FetchConnectionCache
-        def checkstatus_init(thread_worker):
-            thread_worker.connection_cache = FetchConnectionCache()
-
-        def checkstatus_end(thread_worker):
-            thread_worker.connection_cache.close_connections()
-
-        def checkstatus(thread_worker, arg):
+        def checkstatus_init():
+            while not connection_cache_pool.full():
+                connection_cache_pool.put(FetchConnectionCache())
+
+        def checkstatus_end():
+            while not connection_cache_pool.empty():
+                connection_cache = connection_cache_pool.get()
+                connection_cache.close_connections()
+
+        import threading
+        _lock = threading.Lock()
+        def checkstatus(arg):
             (tid, sstatefile) = arg
 
+            connection_cache = connection_cache_pool.get()
+
             localdata2 = bb.data.createCopy(localdata)
             srcuri = "file://" + sstatefile
             localdata2.setVar('SRC_URI', srcuri)
@@ -995,7 +1002,7 @@  def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
 
             try:
                 fetcher = bb.fetch2.Fetch(srcuri.split(), localdata2,
-                            connection_cache=thread_worker.connection_cache)
+                            connection_cache=connection_cache)
                 fetcher.checkstatus()
                 bb.debug(2, "SState: Successful fetch test for %s" % srcuri)
                 found.add(tid)
@@ -1005,8 +1012,12 @@  def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
             except Exception as e:
                 bb.error("SState: cannot test %s: %s\n%s" % (srcuri, repr(e), traceback.format_exc()))
 
+            connection_cache_pool.put(connection_cache)
+
             if progress:
-                bb.event.fire(bb.event.ProcessProgress(msg, len(tasklist) - thread_worker.tasks.qsize()), d)
+                with _lock:
+                    tasks -= 1
+                bb.event.fire(bb.event.ProcessProgress(msg, len(tasklist) - tasks), d)
 
         tasklist = []
         for tid in missed:
@@ -1016,6 +1027,7 @@  def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
         if tasklist:
             nproc = min(int(d.getVar("BB_NUMBER_THREADS")), len(tasklist))
 
+            tasks = len(tasklist)
             progress = len(tasklist) >= 100
             if progress:
                 msg = "Checking sstate mirror object availability"
@@ -1025,13 +1037,13 @@  def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
             fetcherenv = bb.fetch2.get_fetcher_environment(d)
             with bb.utils.environment(**fetcherenv):
                 bb.event.enable_threadlock()
-                pool = oe.utils.ThreadedPool(nproc, len(tasklist),
-                        worker_init=checkstatus_init, worker_end=checkstatus_end,
-                        name="sstate_checkhashes-")
-                for t in tasklist:
-                    pool.add_task(checkstatus, t)
-                pool.start()
-                pool.wait_completion()
+                import concurrent.futures
+                from queue import Queue
+                connection_cache_pool = Queue(nproc)
+                checkstatus_init()
+                with concurrent.futures.ThreadPoolExecutor(max_workers=nproc) as executor:
+                    executor.map(checkstatus, tasklist)
+                checkstatus_end()
                 bb.event.disable_threadlock()
 
             if progress: