diff mbox series

hashserv: Retry and sleep if database is locked on INSERT

Message ID 20230921100538.154753-1-tobias.hagelborn@axis.com
State New
Headers show
Series hashserv: Retry and sleep if database is locked on INSERT | expand

Commit Message

Tobias Hagelborn Sept. 21, 2023, 10:05 a.m. UTC
From: Tobias Hagelborn <tobiasha@axis.com>

Retry insert operations in case the database is locked by
an external process. For instance an external cleanup or data
retention transaction. Use async sleep to not block the event loop.

Signed-off-by: Tobias Hagelborn <tobias.hagelborn@axis.com>
---
 lib/hashserv/server.py | 41 +++++++++++++++++++++++++++--------------
 1 file changed, 27 insertions(+), 14 deletions(-)

Comments

Joshua Watt Sept. 21, 2023, 4:14 p.m. UTC | #1
On Thu, Sep 21, 2023 at 4:08 AM Tobias Hagelborn
<tobias.hagelborn@axis.com> wrote:
>
> From: Tobias Hagelborn <tobiasha@axis.com>
>
> Retry insert operations in case the database is locked by
> an external process. For instance an external cleanup or data
> retention transaction. Use async sleep to not block the event loop.

sqlite already handles this internally with a timeout specified in
sqlite3.connect(). The default is 5 seconds; I think it would be
better to add a command line option to the server that allows a longer
timeout to be specified instead of manually retrying.

Allowing multiple queries to run in parallel (a side effect of async)
might mess up the cursor.lastrowid tracking, so I'm a little leary of
doing that. The long blocks should only actually happen when you are
doing long maintenance operations, so an option for a longer timeout
on the server is probably better (and, maybe rework your cleanup to
not lock the database for so long)


>
> Signed-off-by: Tobias Hagelborn <tobias.hagelborn@axis.com>
> ---
>  lib/hashserv/server.py | 41 +++++++++++++++++++++++++++--------------
>  1 file changed, 27 insertions(+), 14 deletions(-)
>
> diff --git a/lib/hashserv/server.py b/lib/hashserv/server.py
> index d40a2ab8..c898be3f 100644
> --- a/lib/hashserv/server.py
> +++ b/lib/hashserv/server.py
> @@ -9,6 +9,7 @@ import enum
>  import asyncio
>  import logging
>  import math
> +import sqlite3
>  import time
>  from . import create_async_client, UNIHASH_TABLE_COLUMNS, OUTHASH_TABLE_COLUMNS
>  import bb.asyncrpc
> @@ -114,7 +115,7 @@ class Resolve(enum.Enum):
>      REPLACE = enum.auto()
>
>
> -def insert_table(cursor, table, data, on_conflict):
> +async def insert_table(cursor, table, data, on_conflict):
>      resolve = {
>          Resolve.FAIL: "",
>          Resolve.IGNORE: " OR IGNORE",
> @@ -129,7 +130,19 @@ def insert_table(cursor, table, data, on_conflict):
>          values=", ".join(":" + k for k in keys),
>      )
>      prevrowid = cursor.lastrowid
> -    cursor.execute(query, data)
> +
> +    RETRIES = 5
> +    for x in range(RETRIES):
> +        try:
> +            cursor.execute(query, data)
> +        except sqlite3.OperationalError as e:
> +            if "database is locked" in str(e):
> +                await asyncio.sleep(1)
> +        finally:
> +            break
> +    else:
> +        cursor.execute(query, data)
> +
>      logging.debug(
>          "Inserting %r into %s, %s",
>          data,
> @@ -138,17 +151,17 @@ def insert_table(cursor, table, data, on_conflict):
>      )
>      return (cursor.lastrowid, cursor.lastrowid != prevrowid)
>
> -def insert_unihash(cursor, data, on_conflict):
> -    return insert_table(cursor, "unihashes_v2", data, on_conflict)
> +async def insert_unihash(cursor, data, on_conflict):
> +    return await insert_table(cursor, "unihashes_v2", data, on_conflict)
>
> -def insert_outhash(cursor, data, on_conflict):
> -    return insert_table(cursor, "outhashes_v2", data, on_conflict)
> +async def insert_outhash(cursor, data, on_conflict):
> +    return await insert_table(cursor, "outhashes_v2", data, on_conflict)
>
>  async def copy_unihash_from_upstream(client, db, method, taskhash):
>      d = await client.get_taskhash(method, taskhash)
>      if d is not None:
>          with closing(db.cursor()) as cursor:
> -            insert_unihash(
> +            await insert_unihash(
>                  cursor,
>                  {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS},
>                  Resolve.IGNORE,
> @@ -260,7 +273,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
>              elif self.upstream_client is not None:
>                  d = await self.upstream_client.get_taskhash(method, taskhash)
>                  d = {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS}
> -                insert_unihash(cursor, d, Resolve.IGNORE)
> +                await insert_unihash(cursor, d, Resolve.IGNORE)
>                  self.db.commit()
>
>          return d
> @@ -301,16 +314,16 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
>
>          return d
>
> -    def update_unified(self, cursor, data):
> +    async def update_unified(self, cursor, data):
>          if data is None:
>              return
>
> -        insert_unihash(
> +        await insert_unihash(
>              cursor,
>              {k: v for k, v in data.items() if k in UNIHASH_TABLE_COLUMNS},
>              Resolve.IGNORE
>          )
> -        insert_outhash(
> +        await insert_outhash(
>              cursor,
>              {k: v for k, v in data.items() if k in OUTHASH_TABLE_COLUMNS},
>              Resolve.IGNORE
> @@ -386,7 +399,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
>                      outhash_data[k] = data[k]
>
>              # Insert the new entry, unless it already exists
> -            (rowid, inserted) = insert_outhash(cursor, outhash_data, Resolve.IGNORE)
> +            (rowid, inserted) = await insert_outhash(cursor, outhash_data, Resolve.IGNORE)
>
>              if inserted:
>                  # If this row is new, check if it is equivalent to another
> @@ -427,7 +440,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
>                              unihash = upstream_data['unihash']
>
>
> -                insert_unihash(
> +                await insert_unihash(
>                      cursor,
>                      {
>                          'method': data['method'],
> @@ -460,7 +473,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
>                  'taskhash': data['taskhash'],
>                  'unihash': data['unihash'],
>              }
> -            insert_unihash(cursor, insert_data, Resolve.IGNORE)
> +            await insert_unihash(cursor, insert_data, Resolve.IGNORE)
>              self.db.commit()
>
>              # Fetch the unihash that will be reported for the taskhash. If the
> --
> 2.30.2
>
>
> -=-=-=-=-=-=-=-=-=-=-=-
> Links: You receive all messages sent to this group.
> View/Reply Online (#187987): https://lists.openembedded.org/g/openembedded-core/message/187987
> Mute This Topic: https://lists.openembedded.org/mt/101496954/3616693
> Group Owner: openembedded-core+owner@lists.openembedded.org
> Unsubscribe: https://lists.openembedded.org/g/openembedded-core/unsub [JPEWhacker@gmail.com]
> -=-=-=-=-=-=-=-=-=-=-=-
>
diff mbox series

Patch

diff --git a/lib/hashserv/server.py b/lib/hashserv/server.py
index d40a2ab8..c898be3f 100644
--- a/lib/hashserv/server.py
+++ b/lib/hashserv/server.py
@@ -9,6 +9,7 @@  import enum
 import asyncio
 import logging
 import math
+import sqlite3
 import time
 from . import create_async_client, UNIHASH_TABLE_COLUMNS, OUTHASH_TABLE_COLUMNS
 import bb.asyncrpc
@@ -114,7 +115,7 @@  class Resolve(enum.Enum):
     REPLACE = enum.auto()
 
 
-def insert_table(cursor, table, data, on_conflict):
+async def insert_table(cursor, table, data, on_conflict):
     resolve = {
         Resolve.FAIL: "",
         Resolve.IGNORE: " OR IGNORE",
@@ -129,7 +130,19 @@  def insert_table(cursor, table, data, on_conflict):
         values=", ".join(":" + k for k in keys),
     )
     prevrowid = cursor.lastrowid
-    cursor.execute(query, data)
+
+    RETRIES = 5
+    for x in range(RETRIES):
+        try:
+            cursor.execute(query, data)
+        except sqlite3.OperationalError as e:
+            if "database is locked" in str(e):
+                await asyncio.sleep(1)
+        finally:
+            break
+    else:
+        cursor.execute(query, data)
+
     logging.debug(
         "Inserting %r into %s, %s",
         data,
@@ -138,17 +151,17 @@  def insert_table(cursor, table, data, on_conflict):
     )
     return (cursor.lastrowid, cursor.lastrowid != prevrowid)
 
-def insert_unihash(cursor, data, on_conflict):
-    return insert_table(cursor, "unihashes_v2", data, on_conflict)
+async def insert_unihash(cursor, data, on_conflict):
+    return await insert_table(cursor, "unihashes_v2", data, on_conflict)
 
-def insert_outhash(cursor, data, on_conflict):
-    return insert_table(cursor, "outhashes_v2", data, on_conflict)
+async def insert_outhash(cursor, data, on_conflict):
+    return await insert_table(cursor, "outhashes_v2", data, on_conflict)
 
 async def copy_unihash_from_upstream(client, db, method, taskhash):
     d = await client.get_taskhash(method, taskhash)
     if d is not None:
         with closing(db.cursor()) as cursor:
-            insert_unihash(
+            await insert_unihash(
                 cursor,
                 {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS},
                 Resolve.IGNORE,
@@ -260,7 +273,7 @@  class ServerClient(bb.asyncrpc.AsyncServerConnection):
             elif self.upstream_client is not None:
                 d = await self.upstream_client.get_taskhash(method, taskhash)
                 d = {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS}
-                insert_unihash(cursor, d, Resolve.IGNORE)
+                await insert_unihash(cursor, d, Resolve.IGNORE)
                 self.db.commit()
 
         return d
@@ -301,16 +314,16 @@  class ServerClient(bb.asyncrpc.AsyncServerConnection):
 
         return d
 
-    def update_unified(self, cursor, data):
+    async def update_unified(self, cursor, data):
         if data is None:
             return
 
-        insert_unihash(
+        await insert_unihash(
             cursor,
             {k: v for k, v in data.items() if k in UNIHASH_TABLE_COLUMNS},
             Resolve.IGNORE
         )
-        insert_outhash(
+        await insert_outhash(
             cursor,
             {k: v for k, v in data.items() if k in OUTHASH_TABLE_COLUMNS},
             Resolve.IGNORE
@@ -386,7 +399,7 @@  class ServerClient(bb.asyncrpc.AsyncServerConnection):
                     outhash_data[k] = data[k]
 
             # Insert the new entry, unless it already exists
-            (rowid, inserted) = insert_outhash(cursor, outhash_data, Resolve.IGNORE)
+            (rowid, inserted) = await insert_outhash(cursor, outhash_data, Resolve.IGNORE)
 
             if inserted:
                 # If this row is new, check if it is equivalent to another
@@ -427,7 +440,7 @@  class ServerClient(bb.asyncrpc.AsyncServerConnection):
                             unihash = upstream_data['unihash']
 
 
-                insert_unihash(
+                await insert_unihash(
                     cursor,
                     {
                         'method': data['method'],
@@ -460,7 +473,7 @@  class ServerClient(bb.asyncrpc.AsyncServerConnection):
                 'taskhash': data['taskhash'],
                 'unihash': data['unihash'],
             }
-            insert_unihash(cursor, insert_data, Resolve.IGNORE)
+            await insert_unihash(cursor, insert_data, Resolve.IGNORE)
             self.db.commit()
 
             # Fetch the unihash that will be reported for the taskhash. If the