Server : Apache System : Linux iad1-shared-b8-43 6.6.49-grsec-jammy+ #10 SMP Thu Sep 12 23:23:08 UTC 2024 x86_64 User : dh_edsupp ( 6597262) PHP Version : 8.2.26 Disable Function : NONE Directory : /lib/python3/dist-packages/fasteners/tests/ |
Upload File : |
# -*- coding: utf-8 -*- # Copyright 2011 OpenStack Foundation. # Copyright 2011 Justin Santa Barbara # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import errno import fcntl import multiprocessing import os import shutil import signal import tempfile import threading import time from fasteners import process_lock as pl from fasteners import test class BrokenLock(pl.InterProcessLock): def __init__(self, name, errno_code): super(BrokenLock, self).__init__(name) self.errno_code = errno_code def unlock(self): pass def trylock(self): err = IOError() err.errno = self.errno_code raise err class ProcessLockTest(test.TestCase): def setUp(self): super(ProcessLockTest, self).setUp() self.lock_dir = tempfile.mkdtemp() self.tmp_dirs = [self.lock_dir] def tearDown(self): super(ProcessLockTest, self).tearDown() for a_dir in reversed(self.tmp_dirs): if os.path.exists(a_dir): shutil.rmtree(a_dir, ignore_errors=True) def test_lock_acquire_release_file_lock(self): lock_file = os.path.join(self.lock_dir, 'lock') lock = pl.InterProcessLock(lock_file) def try_lock(): try: my_lock = pl.InterProcessLock(lock_file) my_lock.lockfile = open(lock_file, 'w') my_lock.trylock() my_lock.unlock() os._exit(1) except IOError: os._exit(0) def attempt_acquire(count): children = [] for i in range(count): child = multiprocessing.Process(target=try_lock) child.start() children.append(child) exit_codes = [] for child in children: child.join() exit_codes.append(child.exitcode) return sum(exit_codes) self.assertTrue(lock.acquire()) try: acquired_children = attempt_acquire(10) self.assertEqual(0, acquired_children) finally: lock.release() acquired_children = attempt_acquire(5) self.assertNotEqual(0, acquired_children) def test_nested_synchronized_external_works(self): sentinel = object() @pl.interprocess_locked(os.path.join(self.lock_dir, 'test-lock-1')) def outer_lock(): @pl.interprocess_locked(os.path.join(self.lock_dir, 'test-lock-2')) def inner_lock(): return sentinel return inner_lock() self.assertEqual(sentinel, outer_lock()) def _do_test_lock_externally(self, lock_dir): lock_path = os.path.join(lock_dir, "lock") def lock_files(handles_dir): with pl.InterProcessLock(lock_path): # Open some files we can use for locking handles = [] for n in range(50): path = os.path.join(handles_dir, ('file-%s' % n)) handles.append(open(path, 'w')) # Loop over all the handles and try locking the file # without blocking, keep a count of how many files we # were able to lock and then unlock. If the lock fails # we get an IOError and bail out with bad exit code count = 0 for handle in handles: try: fcntl.flock(handle, fcntl.LOCK_EX | fcntl.LOCK_NB) count += 1 fcntl.flock(handle, fcntl.LOCK_UN) except IOError: os._exit(2) finally: handle.close() # Check if we were able to open all files self.assertEqual(50, count) handles_dir = tempfile.mkdtemp() self.tmp_dirs.append(handles_dir) children = [] for n in range(50): pid = os.fork() if pid: children.append(pid) else: try: lock_files(handles_dir) finally: os._exit(0) for child in children: (pid, status) = os.waitpid(child, 0) if pid: self.assertEqual(0, status) def test_lock_externally(self): self._do_test_lock_externally(self.lock_dir) def test_lock_externally_lock_dir_not_exist(self): os.rmdir(self.lock_dir) self._do_test_lock_externally(self.lock_dir) def test_lock_file_exists(self): lock_file = os.path.join(self.lock_dir, 'lock') @pl.interprocess_locked(lock_file) def foo(): self.assertTrue(os.path.exists(lock_file)) foo() def test_bad_acquire(self): lock_file = os.path.join(self.lock_dir, 'lock') lock = BrokenLock(lock_file, errno.EBUSY) self.assertRaises(threading.ThreadError, lock.acquire) def test_bad_release(self): lock_file = os.path.join(self.lock_dir, 'lock') lock = pl.InterProcessLock(lock_file) self.assertRaises(threading.ThreadError, lock.release) def test_interprocess_lock(self): lock_file = os.path.join(self.lock_dir, 'lock') pid = os.fork() if pid: # Make sure the child grabs the lock first start = time.time() while not os.path.exists(lock_file): if time.time() - start > 5: self.fail('Timed out waiting for child to grab lock') time.sleep(0) lock1 = pl.InterProcessLock('foo') lock1.lockfile = open(lock_file, 'w') # NOTE(bnemec): There is a brief window between when the lock file # is created and when it actually becomes locked. If we happen to # context switch in that window we may succeed in locking the # file. Keep retrying until we either get the expected exception # or timeout waiting. while time.time() - start < 5: try: lock1.trylock() lock1.unlock() time.sleep(0) except IOError: # This is what we expect to happen break else: self.fail('Never caught expected lock exception') # We don't need to wait for the full sleep in the child here os.kill(pid, signal.SIGKILL) else: try: lock2 = pl.InterProcessLock('foo') lock2.lockfile = open(lock_file, 'w') have_lock = False while not have_lock: try: lock2.trylock() have_lock = True except IOError: pass finally: # NOTE(bnemec): This is racy, but I don't want to add any # synchronization primitives that might mask a problem # with the one we're trying to test here. time.sleep(.5) os._exit(0) def test_non_destructive(self): lock_file = os.path.join(self.lock_dir, 'not-destroyed') with open(lock_file, 'w') as f: f.write('test') with pl.InterProcessLock(lock_file): with open(lock_file) as f: self.assertEqual(f.read(), 'test')