diff mbox series

[bitbake-devel] Add xattr and acl libraries

Message ID 20230817185318.1562354-1-JPEWhacker@gmail.com
State New
Headers show
Series [bitbake-devel] Add xattr and acl libraries | expand

Commit Message

Joshua Watt Aug. 17, 2023, 6:53 p.m. UTC
Adds Python wrappers around the xattr API from libc and the ACL API from
libacl.

Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
---
 bitbake/lib/bb/acl.py   | 215 ++++++++++++++++++++++++++++++++++++++++
 bitbake/lib/bb/xattr.py | 126 +++++++++++++++++++++++
 2 files changed, 341 insertions(+)
 create mode 100755 bitbake/lib/bb/acl.py
 create mode 100755 bitbake/lib/bb/xattr.py

Comments

Piotr Łobacz Aug. 21, 2023, 9:07 p.m. UTC | #1
W dniu 17.08.2023 o 20:53, Joshua Watt pisze:
> Adds Python wrappers around the xattr API from libc and the ACL API from
> libacl.
>
Reviewed-by: Piotr Łobacz <p.lobacz@welotec.com>
Tested-by: Piotr Łobacz <p.lobacz@welotec.com>
> Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
> ---
>   bitbake/lib/bb/acl.py   | 215 ++++++++++++++++++++++++++++++++++++++++
>   bitbake/lib/bb/xattr.py | 126 +++++++++++++++++++++++
>   2 files changed, 341 insertions(+)
>   create mode 100755 bitbake/lib/bb/acl.py
>   create mode 100755 bitbake/lib/bb/xattr.py
>
> diff --git a/bitbake/lib/bb/acl.py b/bitbake/lib/bb/acl.py
> new file mode 100755
> index 00000000000..0f41b275cf6
> --- /dev/null
> +++ b/bitbake/lib/bb/acl.py
> @@ -0,0 +1,215 @@
> +#! /usr/bin/env python3
> +#
> +# Copyright 2023 by Garmin Ltd. or its subsidiaries
> +#
> +# SPDX-License-Identifier: MIT
> +
> +
> +import sys
> +import ctypes
> +import os
> +import errno
> +import pwd
> +import grp
> +
> +libacl = ctypes.CDLL("libacl.so.1", use_errno=True)
> +
> +
> +ACL_TYPE_ACCESS = 0x8000
> +ACL_TYPE_DEFAULT = 0x4000
> +
> +ACL_FIRST_ENTRY = 0
> +ACL_NEXT_ENTRY = 1
> +
> +ACL_UNDEFINED_TAG = 0x00
> +ACL_USER_OBJ = 0x01
> +ACL_USER = 0x02
> +ACL_GROUP_OBJ = 0x04
> +ACL_GROUP = 0x08
> +ACL_MASK = 0x10
> +ACL_OTHER = 0x20
> +
> +ACL_READ = 0x04
> +ACL_WRITE = 0x02
> +ACL_EXECUTE = 0x01
> +
> +acl_t = ctypes.c_void_p
> +acl_entry_t = ctypes.c_void_p
> +acl_permset_t = ctypes.c_void_p
> +acl_perm_t = ctypes.c_uint
> +
> +acl_tag_t = ctypes.c_int
> +
> +libacl.acl_free.argtypes = [acl_t]
> +
> +
> +def acl_free(acl):
> +    libacl.acl_free(acl)
> +
> +
> +libacl.acl_get_file.restype = acl_t
> +libacl.acl_get_file.argtypes = [ctypes.c_char_p, ctypes.c_uint]
> +
> +
> +def acl_get_file(path, typ):
> +    acl = libacl.acl_get_file(os.fsencode(path), typ)
> +    if acl is None:
> +        err = ctypes.get_errno()
> +        raise OSError(err, os.strerror(err), str(path))
> +
> +    return acl
> +
> +
> +libacl.acl_get_entry.argtypes = [acl_t, ctypes.c_int, ctypes.c_void_p]
> +
> +
> +def acl_get_entry(acl, entry_id):
> +    entry = acl_entry_t()
> +    ret = libacl.acl_get_entry(acl, entry_id, ctypes.byref(entry))
> +    if ret < 0:
> +        err = ctypes.get_errno()
> +        raise OSError(err, os.strerror(err))
> +
> +    if ret == 0:
> +        return None
> +
> +    return entry
> +
> +
> +libacl.acl_get_tag_type.argtypes = [acl_entry_t, ctypes.c_void_p]
> +
> +
> +def acl_get_tag_type(entry_d):
> +    tag = acl_tag_t()
> +    ret = libacl.acl_get_tag_type(entry_d, ctypes.byref(tag))
> +    if ret < 0:
> +        err = ctypes.get_errno()
> +        raise OSError(err, os.strerror(err))
> +    return tag.value
> +
> +
> +libacl.acl_get_qualifier.restype = ctypes.c_void_p
> +libacl.acl_get_qualifier.argtypes = [acl_entry_t]
> +
> +
> +def acl_get_qualifier(entry_d):
> +    ret = libacl.acl_get_qualifier(entry_d)
> +    if ret is None:
> +        err = ctypes.get_errno()
> +        raise OSError(err, os.strerror(err))
> +    return ctypes.c_void_p(ret)
> +
> +
> +libacl.acl_get_permset.argtypes = [acl_entry_t, ctypes.c_void_p]
> +
> +
> +def acl_get_permset(entry_d):
> +    permset = acl_permset_t()
> +    ret = libacl.acl_get_permset(entry_d, ctypes.byref(permset))
> +    if ret < 0:
> +        err = ctypes.get_errno()
> +        raise OSError(err, os.strerror(err))
> +
> +    return permset
> +
> +
> +libacl.acl_get_perm.argtypes = [acl_permset_t, acl_perm_t]
> +
> +
> +def acl_get_perm(permset_d, perm):
> +    ret = libacl.acl_get_perm(permset_d, perm)
> +    if ret < 0:
> +        err = ctypes.get_errno()
> +        raise OSError(err, os.strerror(err))
> +    return bool(ret)
> +
> +
> +class Entry(object):
> +    def __init__(self, tag, qualifier, mode):
> +        self.tag = tag
> +        self.qualifier = qualifier
> +        self.mode = mode
> +
> +    def __str__(self):
> +        typ = ""
> +        qual = ""
> +        if self.tag == ACL_USER:
> +            typ = "user"
> +            qual = pwd.getpwuid(self.qualifier).pw_name
> +        elif self.tag == ACL_GROUP:
> +            typ = "group"
> +            qual = grp.getgrgid(self.qualifier).gr_name
> +        elif self.tag == ACL_USER_OBJ:
> +            typ = "user"
> +        elif self.tag == ACL_GROUP_OBJ:
> +            typ = "group"
> +        elif self.tag == ACL_MASK:
> +            typ = "mask"
> +        elif self.tag == ACL_OTHER:
> +            typ = "other"
> +
> +        r = "r" if self.mode & ACL_READ else "-"
> +        w = "w" if self.mode & ACL_WRITE else "-"
> +        x = "x" if self.mode & ACL_EXECUTE else "-"
> +
> +        return f"{typ}:{qual}:{r}{w}{x}"
> +
> +
> +class ACL(object):
> +    def __init__(self, acl):
> +        self.acl = acl
> +
> +    def __del__(self):
> +        acl_free(self.acl)
> +
> +    def entries(self):
> +        entry_id = ACL_FIRST_ENTRY
> +        while True:
> +            entry = acl_get_entry(self.acl, entry_id)
> +            if entry is None:
> +                break
> +
> +            permset = acl_get_permset(entry)
> +
> +            mode = 0
> +            for m in (ACL_READ, ACL_WRITE, ACL_EXECUTE):
> +                if acl_get_perm(permset, m):
> +                    mode |= m
> +
> +            qualifier = None
> +            tag = acl_get_tag_type(entry)
> +
> +            if tag == ACL_USER or tag == ACL_GROUP:
> +                qual = acl_get_qualifier(entry)
> +                qualifier = ctypes.cast(qual, ctypes.POINTER(ctypes.c_int))[0]
> +
> +            yield Entry(tag, qualifier, mode)
> +
> +            entry_id = ACL_NEXT_ENTRY
> +
> +    @classmethod
> +    def from_path(cls, path, typ):
> +        acl = acl_get_file(path, typ)
> +        return cls(acl)
> +
> +
> +def main():
> +    import argparse
> +    import pwd
> +    import grp
> +    from pathlib import Path
> +
> +    parser = argparse.ArgumentParser()
> +    parser.add_argument("path", help="File Path", type=Path)
> +
> +    args = parser.parse_args()
> +
> +    acl = ACL.from_path(args.path, ACL_TYPE_ACCESS)
> +    for entry in acl.entries():
> +        print(str(entry))
> +
> +    return 0
> +
> +
> +if __name__ == "__main__":
> +    sys.exit(main())
> diff --git a/bitbake/lib/bb/xattr.py b/bitbake/lib/bb/xattr.py
> new file mode 100755
> index 00000000000..7b634944a40
> --- /dev/null
> +++ b/bitbake/lib/bb/xattr.py
> @@ -0,0 +1,126 @@
> +#! /usr/bin/env python3
> +#
> +# Copyright 2023 by Garmin Ltd. or its subsidiaries
> +#
> +# SPDX-License-Identifier: MIT
> +
> +import sys
> +import ctypes
> +import os
> +import errno
> +
> +libc = ctypes.CDLL("libc.so.6", use_errno=True)
> +fsencoding = sys.getfilesystemencoding()
> +
> +
> +libc.listxattr.argtypes = [ctypes.c_char_p, ctypes.c_char_p, ctypes.c_size_t]
> +libc.llistxattr.argtypes = [ctypes.c_char_p, ctypes.c_char_p, ctypes.c_size_t]
> +
> +
> +def listxattr(path, follow=True):
> +    func = libc.listxattr if follow else libc.llistxattr
> +
> +    os_path = os.fsencode(path)
> +
> +    while True:
> +        length = func(os_path, None, 0)
> +
> +        if length < 0:
> +            err = ctypes.get_errno()
> +            raise OSError(err, os.strerror(err), str(path))
> +
> +        if length == 0:
> +            return []
> +
> +        arr = ctypes.create_string_buffer(length)
> +
> +        read_length = func(os_path, arr, length)
> +        if read_length != length:
> +            # Race!
> +            continue
> +
> +        return [a.decode(fsencoding) for a in arr.raw.split(b"\x00") if a]
> +
> +
> +libc.getxattr.argtypes = [
> +    ctypes.c_char_p,
> +    ctypes.c_char_p,
> +    ctypes.c_char_p,
> +    ctypes.c_size_t,
> +]
> +libc.lgetxattr.argtypes = [
> +    ctypes.c_char_p,
> +    ctypes.c_char_p,
> +    ctypes.c_char_p,
> +    ctypes.c_size_t,
> +]
> +
> +
> +def getxattr(path, name, follow=True):
> +    func = libc.getxattr if follow else libc.lgetxattr
> +
> +    os_path = os.fsencode(path)
> +    os_name = os.fsencode(name)
> +
> +    while True:
> +        length = func(os_path, os_name, None, 0)
> +
> +        if length < 0:
> +            err = ctypes.get_errno()
> +            if err == errno.ENODATA:
> +                return None
> +            raise OSError(err, os.strerror(err), str(path))
> +
> +        if length == 0:
> +            return ""
> +
> +        arr = ctypes.create_string_buffer(length)
> +
> +        read_length = func(os_path, os_name, arr, length)
> +        if read_length != length:
> +            # Race!
> +            continue
> +
> +        return arr.raw
> +
> +
> +def get_all_xattr(path, follow=True):
> +    attrs = {}
> +
> +    names = listxattr(path, follow)
> +
> +    for name in names:
> +        value = getxattr(path, name, follow)
> +        if value is None:
> +            # This can happen if a value is erased after listxattr is called,
> +            # so ignore it
> +            continue
> +        attrs[name] = value
> +
> +    return attrs
> +
> +
> +def main():
> +    import argparse
> +    from pathlib import Path
> +
> +    parser = argparse.ArgumentParser()
> +    parser.add_argument("path", help="File Path", type=Path)
> +
> +    args = parser.parse_args()
> +
> +    attrs = get_all_xattr(args.path)
> +
> +    for name, value in attrs.items():
> +        try:
> +            value = value.decode(fsencoding)
> +        except UnicodeDecodeError:
> +            pass
> +
> +        print(f"{name} = {value}")
> +
> +    return 0
> +
> +
> +if __name__ == "__main__":
> +    sys.exit(main())
diff mbox series

Patch

diff --git a/bitbake/lib/bb/acl.py b/bitbake/lib/bb/acl.py
new file mode 100755
index 00000000000..0f41b275cf6
--- /dev/null
+++ b/bitbake/lib/bb/acl.py
@@ -0,0 +1,215 @@ 
+#! /usr/bin/env python3
+#
+# Copyright 2023 by Garmin Ltd. or its subsidiaries
+#
+# SPDX-License-Identifier: MIT
+
+
+import sys
+import ctypes
+import os
+import errno
+import pwd
+import grp
+
+libacl = ctypes.CDLL("libacl.so.1", use_errno=True)
+
+
+ACL_TYPE_ACCESS = 0x8000
+ACL_TYPE_DEFAULT = 0x4000
+
+ACL_FIRST_ENTRY = 0
+ACL_NEXT_ENTRY = 1
+
+ACL_UNDEFINED_TAG = 0x00
+ACL_USER_OBJ = 0x01
+ACL_USER = 0x02
+ACL_GROUP_OBJ = 0x04
+ACL_GROUP = 0x08
+ACL_MASK = 0x10
+ACL_OTHER = 0x20
+
+ACL_READ = 0x04
+ACL_WRITE = 0x02
+ACL_EXECUTE = 0x01
+
+acl_t = ctypes.c_void_p
+acl_entry_t = ctypes.c_void_p
+acl_permset_t = ctypes.c_void_p
+acl_perm_t = ctypes.c_uint
+
+acl_tag_t = ctypes.c_int
+
+libacl.acl_free.argtypes = [acl_t]
+
+
+def acl_free(acl):
+    libacl.acl_free(acl)
+
+
+libacl.acl_get_file.restype = acl_t
+libacl.acl_get_file.argtypes = [ctypes.c_char_p, ctypes.c_uint]
+
+
+def acl_get_file(path, typ):
+    acl = libacl.acl_get_file(os.fsencode(path), typ)
+    if acl is None:
+        err = ctypes.get_errno()
+        raise OSError(err, os.strerror(err), str(path))
+
+    return acl
+
+
+libacl.acl_get_entry.argtypes = [acl_t, ctypes.c_int, ctypes.c_void_p]
+
+
+def acl_get_entry(acl, entry_id):
+    entry = acl_entry_t()
+    ret = libacl.acl_get_entry(acl, entry_id, ctypes.byref(entry))
+    if ret < 0:
+        err = ctypes.get_errno()
+        raise OSError(err, os.strerror(err))
+
+    if ret == 0:
+        return None
+
+    return entry
+
+
+libacl.acl_get_tag_type.argtypes = [acl_entry_t, ctypes.c_void_p]
+
+
+def acl_get_tag_type(entry_d):
+    tag = acl_tag_t()
+    ret = libacl.acl_get_tag_type(entry_d, ctypes.byref(tag))
+    if ret < 0:
+        err = ctypes.get_errno()
+        raise OSError(err, os.strerror(err))
+    return tag.value
+
+
+libacl.acl_get_qualifier.restype = ctypes.c_void_p
+libacl.acl_get_qualifier.argtypes = [acl_entry_t]
+
+
+def acl_get_qualifier(entry_d):
+    ret = libacl.acl_get_qualifier(entry_d)
+    if ret is None:
+        err = ctypes.get_errno()
+        raise OSError(err, os.strerror(err))
+    return ctypes.c_void_p(ret)
+
+
+libacl.acl_get_permset.argtypes = [acl_entry_t, ctypes.c_void_p]
+
+
+def acl_get_permset(entry_d):
+    permset = acl_permset_t()
+    ret = libacl.acl_get_permset(entry_d, ctypes.byref(permset))
+    if ret < 0:
+        err = ctypes.get_errno()
+        raise OSError(err, os.strerror(err))
+
+    return permset
+
+
+libacl.acl_get_perm.argtypes = [acl_permset_t, acl_perm_t]
+
+
+def acl_get_perm(permset_d, perm):
+    ret = libacl.acl_get_perm(permset_d, perm)
+    if ret < 0:
+        err = ctypes.get_errno()
+        raise OSError(err, os.strerror(err))
+    return bool(ret)
+
+
+class Entry(object):
+    def __init__(self, tag, qualifier, mode):
+        self.tag = tag
+        self.qualifier = qualifier
+        self.mode = mode
+
+    def __str__(self):
+        typ = ""
+        qual = ""
+        if self.tag == ACL_USER:
+            typ = "user"
+            qual = pwd.getpwuid(self.qualifier).pw_name
+        elif self.tag == ACL_GROUP:
+            typ = "group"
+            qual = grp.getgrgid(self.qualifier).gr_name
+        elif self.tag == ACL_USER_OBJ:
+            typ = "user"
+        elif self.tag == ACL_GROUP_OBJ:
+            typ = "group"
+        elif self.tag == ACL_MASK:
+            typ = "mask"
+        elif self.tag == ACL_OTHER:
+            typ = "other"
+
+        r = "r" if self.mode & ACL_READ else "-"
+        w = "w" if self.mode & ACL_WRITE else "-"
+        x = "x" if self.mode & ACL_EXECUTE else "-"
+
+        return f"{typ}:{qual}:{r}{w}{x}"
+
+
+class ACL(object):
+    def __init__(self, acl):
+        self.acl = acl
+
+    def __del__(self):
+        acl_free(self.acl)
+
+    def entries(self):
+        entry_id = ACL_FIRST_ENTRY
+        while True:
+            entry = acl_get_entry(self.acl, entry_id)
+            if entry is None:
+                break
+
+            permset = acl_get_permset(entry)
+
+            mode = 0
+            for m in (ACL_READ, ACL_WRITE, ACL_EXECUTE):
+                if acl_get_perm(permset, m):
+                    mode |= m
+
+            qualifier = None
+            tag = acl_get_tag_type(entry)
+
+            if tag == ACL_USER or tag == ACL_GROUP:
+                qual = acl_get_qualifier(entry)
+                qualifier = ctypes.cast(qual, ctypes.POINTER(ctypes.c_int))[0]
+
+            yield Entry(tag, qualifier, mode)
+
+            entry_id = ACL_NEXT_ENTRY
+
+    @classmethod
+    def from_path(cls, path, typ):
+        acl = acl_get_file(path, typ)
+        return cls(acl)
+
+
+def main():
+    import argparse
+    import pwd
+    import grp
+    from pathlib import Path
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument("path", help="File Path", type=Path)
+
+    args = parser.parse_args()
+
+    acl = ACL.from_path(args.path, ACL_TYPE_ACCESS)
+    for entry in acl.entries():
+        print(str(entry))
+
+    return 0
+
+
+if __name__ == "__main__":
+    sys.exit(main())
diff --git a/bitbake/lib/bb/xattr.py b/bitbake/lib/bb/xattr.py
new file mode 100755
index 00000000000..7b634944a40
--- /dev/null
+++ b/bitbake/lib/bb/xattr.py
@@ -0,0 +1,126 @@ 
+#! /usr/bin/env python3
+#
+# Copyright 2023 by Garmin Ltd. or its subsidiaries
+#
+# SPDX-License-Identifier: MIT
+
+import sys
+import ctypes
+import os
+import errno
+
+libc = ctypes.CDLL("libc.so.6", use_errno=True)
+fsencoding = sys.getfilesystemencoding()
+
+
+libc.listxattr.argtypes = [ctypes.c_char_p, ctypes.c_char_p, ctypes.c_size_t]
+libc.llistxattr.argtypes = [ctypes.c_char_p, ctypes.c_char_p, ctypes.c_size_t]
+
+
+def listxattr(path, follow=True):
+    func = libc.listxattr if follow else libc.llistxattr
+
+    os_path = os.fsencode(path)
+
+    while True:
+        length = func(os_path, None, 0)
+
+        if length < 0:
+            err = ctypes.get_errno()
+            raise OSError(err, os.strerror(err), str(path))
+
+        if length == 0:
+            return []
+
+        arr = ctypes.create_string_buffer(length)
+
+        read_length = func(os_path, arr, length)
+        if read_length != length:
+            # Race!
+            continue
+
+        return [a.decode(fsencoding) for a in arr.raw.split(b"\x00") if a]
+
+
+libc.getxattr.argtypes = [
+    ctypes.c_char_p,
+    ctypes.c_char_p,
+    ctypes.c_char_p,
+    ctypes.c_size_t,
+]
+libc.lgetxattr.argtypes = [
+    ctypes.c_char_p,
+    ctypes.c_char_p,
+    ctypes.c_char_p,
+    ctypes.c_size_t,
+]
+
+
+def getxattr(path, name, follow=True):
+    func = libc.getxattr if follow else libc.lgetxattr
+
+    os_path = os.fsencode(path)
+    os_name = os.fsencode(name)
+
+    while True:
+        length = func(os_path, os_name, None, 0)
+
+        if length < 0:
+            err = ctypes.get_errno()
+            if err == errno.ENODATA:
+                return None
+            raise OSError(err, os.strerror(err), str(path))
+
+        if length == 0:
+            return ""
+
+        arr = ctypes.create_string_buffer(length)
+
+        read_length = func(os_path, os_name, arr, length)
+        if read_length != length:
+            # Race!
+            continue
+
+        return arr.raw
+
+
+def get_all_xattr(path, follow=True):
+    attrs = {}
+
+    names = listxattr(path, follow)
+
+    for name in names:
+        value = getxattr(path, name, follow)
+        if value is None:
+            # This can happen if a value is erased after listxattr is called,
+            # so ignore it
+            continue
+        attrs[name] = value
+
+    return attrs
+
+
+def main():
+    import argparse
+    from pathlib import Path
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument("path", help="File Path", type=Path)
+
+    args = parser.parse_args()
+
+    attrs = get_all_xattr(args.path)
+
+    for name, value in attrs.items():
+        try:
+            value = value.decode(fsencoding)
+        except UnicodeDecodeError:
+            pass
+
+        print(f"{name} = {value}")
+
+    return 0
+
+
+if __name__ == "__main__":
+    sys.exit(main())