Skip to content

Locking

AlreadyLocked

Bases: Exception

Exception that will be thrown when a lock is already acquired.

Source code in splitgill/locking.py
10
11
12
13
14
15
16
17
class AlreadyLocked(Exception):
    """
    Exception that will be thrown when a lock is already acquired.
    """

    def __init__(self, lock_id: str):
        super().__init__(f"Lock '{lock_id}' is already locked")
        self.lock_id = lock_id

LockManager

Class for managing locks.

Source code in splitgill/locking.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
class LockManager:
    """
    Class for managing locks.
    """

    def __init__(self, lock_collection: Collection):
        """
        :param lock_collection: the collection to use for lock data
        """
        self.lock_collection = lock_collection
        # does nothing if this already exists
        self.lock_collection.create_index('lock_id', unique=True)

    def acquire(self, lock_id: str, raise_on_fail: bool = False, **kwargs) -> bool:
        """
        Acquire the lock with the given lock_id. If the lock can't be acquired, False is
        returned, if it can True is returned. If raise_on_fail is set to True, a
        AlreadyLocked exception is raised if the lock can't be acquired.

        Any additional keyword arguments provided are stored in the lock collection with
        the core lock metadata.

        :param lock_id: the ID of the lock to acquire
        :param raise_on_fail: if True, raises an AlreadyLocked exception if the lock
            can't be acquired. Default: False.
        :return: True if the lock was acquired, False if not
        """
        try:
            doc = {
                'lock_id': lock_id,
                'locked_at': datetime.now(timezone.utc),
                'locked_by': platform.node(),
            }
            if kwargs:
                doc['data'] = kwargs
            self.lock_collection.insert_one(doc)
        except DuplicateKeyError:
            if raise_on_fail:
                raise AlreadyLocked(lock_id)
            return False
        return True

    def release(self, lock_id: str):
        """
        Release the lock with the given lock_id. If the lock_id isn't locked, does
        nothing.

        :param lock_id: the ID of the lock to release
        """
        self.lock_collection.delete_one({'lock_id': lock_id})

    def is_locked(self, lock_id: str) -> bool:
        """
        Check if the given lock_id is locked or not.

        :param lock_id: the ID of the lock to check
        :return: True if the lock is currently acquired, False if not
        """
        return self.get_metadata(lock_id) is not None

    def get_metadata(self, lock_id: str) -> Optional[dict]:
        """
        Returns the doc stored in the lock collection for the given lock ID, if there is
        one.

        :param lock_id:
        :return:
        """
        return self.lock_collection.find_one({'lock_id': lock_id})

    @contextmanager
    def lock(self, lock_id: str, **kwargs):
        """
        Context manager to safely acquire and release a lock with the given lock ID. If
        the lock is already acquired, raises an AlreadyLocked exception.

        Any additional keyword arguments provided are stored in the lock collection with
        the core lock metadata.

        :param lock_id: ID of the lock to acquire and release
        """
        self.acquire(lock_id, raise_on_fail=True, **kwargs)
        try:
            yield
        finally:
            self.release(lock_id)

__init__(lock_collection)

Parameters:

Name Type Description Default
lock_collection Collection

the collection to use for lock data

required
Source code in splitgill/locking.py
25
26
27
28
29
30
31
def __init__(self, lock_collection: Collection):
    """
    :param lock_collection: the collection to use for lock data
    """
    self.lock_collection = lock_collection
    # does nothing if this already exists
    self.lock_collection.create_index('lock_id', unique=True)

acquire(lock_id, raise_on_fail=False, **kwargs)

Acquire the lock with the given lock_id. If the lock can't be acquired, False is returned, if it can True is returned. If raise_on_fail is set to True, a AlreadyLocked exception is raised if the lock can't be acquired.

Any additional keyword arguments provided are stored in the lock collection with the core lock metadata.

Parameters:

Name Type Description Default
lock_id str

the ID of the lock to acquire

required
raise_on_fail bool

if True, raises an AlreadyLocked exception if the lock can't be acquired. Default: False.

False

Returns:

Type Description
bool

True if the lock was acquired, False if not

Source code in splitgill/locking.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def acquire(self, lock_id: str, raise_on_fail: bool = False, **kwargs) -> bool:
    """
    Acquire the lock with the given lock_id. If the lock can't be acquired, False is
    returned, if it can True is returned. If raise_on_fail is set to True, a
    AlreadyLocked exception is raised if the lock can't be acquired.

    Any additional keyword arguments provided are stored in the lock collection with
    the core lock metadata.

    :param lock_id: the ID of the lock to acquire
    :param raise_on_fail: if True, raises an AlreadyLocked exception if the lock
        can't be acquired. Default: False.
    :return: True if the lock was acquired, False if not
    """
    try:
        doc = {
            'lock_id': lock_id,
            'locked_at': datetime.now(timezone.utc),
            'locked_by': platform.node(),
        }
        if kwargs:
            doc['data'] = kwargs
        self.lock_collection.insert_one(doc)
    except DuplicateKeyError:
        if raise_on_fail:
            raise AlreadyLocked(lock_id)
        return False
    return True

get_metadata(lock_id)

Returns the doc stored in the lock collection for the given lock ID, if there is one.

Parameters:

Name Type Description Default
lock_id str
required

Returns:

Type Description
Optional[dict]
Source code in splitgill/locking.py
80
81
82
83
84
85
86
87
88
def get_metadata(self, lock_id: str) -> Optional[dict]:
    """
    Returns the doc stored in the lock collection for the given lock ID, if there is
    one.

    :param lock_id:
    :return:
    """
    return self.lock_collection.find_one({'lock_id': lock_id})

is_locked(lock_id)

Check if the given lock_id is locked or not.

Parameters:

Name Type Description Default
lock_id str

the ID of the lock to check

required

Returns:

Type Description
bool

True if the lock is currently acquired, False if not

Source code in splitgill/locking.py
71
72
73
74
75
76
77
78
def is_locked(self, lock_id: str) -> bool:
    """
    Check if the given lock_id is locked or not.

    :param lock_id: the ID of the lock to check
    :return: True if the lock is currently acquired, False if not
    """
    return self.get_metadata(lock_id) is not None

lock(lock_id, **kwargs)

Context manager to safely acquire and release a lock with the given lock ID. If the lock is already acquired, raises an AlreadyLocked exception.

Any additional keyword arguments provided are stored in the lock collection with the core lock metadata.

Parameters:

Name Type Description Default
lock_id str

ID of the lock to acquire and release

required
Source code in splitgill/locking.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
@contextmanager
def lock(self, lock_id: str, **kwargs):
    """
    Context manager to safely acquire and release a lock with the given lock ID. If
    the lock is already acquired, raises an AlreadyLocked exception.

    Any additional keyword arguments provided are stored in the lock collection with
    the core lock metadata.

    :param lock_id: ID of the lock to acquire and release
    """
    self.acquire(lock_id, raise_on_fail=True, **kwargs)
    try:
        yield
    finally:
        self.release(lock_id)

release(lock_id)

Release the lock with the given lock_id. If the lock_id isn't locked, does nothing.

Parameters:

Name Type Description Default
lock_id str

the ID of the lock to release

required
Source code in splitgill/locking.py
62
63
64
65
66
67
68
69
def release(self, lock_id: str):
    """
    Release the lock with the given lock_id. If the lock_id isn't locked, does
    nothing.

    :param lock_id: the ID of the lock to release
    """
    self.lock_collection.delete_one({'lock_id': lock_id})