from functools import wraps from threading import Event from time import sleep, time from tqdm import TMonitor, tqdm, trange from .tests_tqdm import StringIO, closing, importorskip, patch_lock, skip class Time(object): """Fake time class class providing an offset""" offset = 0 @classmethod def reset(cls): """zeroes internal offset""" cls.offset = 0 @classmethod def time(cls): """time.time() + offset""" return time() + cls.offset @staticmethod def sleep(dur): """identical to time.sleep()""" sleep(dur) @classmethod def fake_sleep(cls, dur): """adds `dur` to internal offset""" cls.offset += dur sleep(0.000001) # sleep to allow interrupt (instead of pass) class FakeEvent(Event): """patched `threading.Event` where `wait()` uses `Time.fake_sleep()`""" def wait(self, timeout=None): """uses Time.fake_sleep""" if timeout is not None: Time.fake_sleep(timeout) return self.is_set() def patch_sleep(func): """Temporarily makes TMonitor use Time.fake_sleep""" @wraps(func) def inner(*args, **kwargs): """restores TMonitor on completion regardless of Exceptions""" TMonitor._test["time"] = Time.time TMonitor._test["Event"] = FakeEvent if tqdm.monitor: assert not tqdm.monitor.get_instances() tqdm.monitor.exit() del tqdm.monitor tqdm.monitor = None try: return func(*args, **kwargs) finally: # Check that class var monitor is deleted if no instance left tqdm.monitor_interval = 10 if tqdm.monitor: assert not tqdm.monitor.get_instances() tqdm.monitor.exit() del tqdm.monitor tqdm.monitor = None TMonitor._test.pop("Event") TMonitor._test.pop("time") return inner def cpu_timify(t, timer=Time): """Force tqdm to use the specified timer instead of system-wide time""" t._time = timer.time t._sleep = timer.fake_sleep t.start_t = t.last_print_t = t._time() return timer class FakeTqdm(object): _instances = set() get_lock = tqdm.get_lock def incr(x): return x + 1 def incr_bar(x): with closing(StringIO()) as our_file: for _ in trange(x, lock_args=(False,), file=our_file): pass return incr(x) @patch_sleep def test_monitor_thread(): """Test dummy monitoring thread""" monitor = TMonitor(FakeTqdm, 10) # Test if alive, then killed assert monitor.report() monitor.exit() assert not monitor.report() assert not monitor.is_alive() del monitor @patch_sleep def test_monitoring_and_cleanup(): """Test for stalled tqdm instance and monitor deletion""" # Note: should fix miniters for these tests, else with dynamic_miniters # it's too complicated to handle with monitoring update and maxinterval... maxinterval = tqdm.monitor_interval assert maxinterval == 10 total = 1000 with closing(StringIO()) as our_file: with tqdm(total=total, file=our_file, miniters=500, mininterval=0.1, maxinterval=maxinterval) as t: cpu_timify(t, Time) # Do a lot of iterations in a small timeframe # (smaller than monitor interval) Time.fake_sleep(maxinterval / 10) # monitor won't wake up t.update(500) # check that our fixed miniters is still there assert t.miniters <= 500 # TODO: should really be == 500 # Then do 1 it after monitor interval, so that monitor kicks in Time.fake_sleep(maxinterval) t.update(1) # Wait for the monitor to get out of sleep's loop and update tqdm. timeend = Time.time() while not (t.monitor.woken >= timeend and t.miniters == 1): Time.fake_sleep(1) # Force awake up if it woken too soon assert t.miniters == 1 # check that monitor corrected miniters # Note: at this point, there may be a race condition: monitor saved # current woken time but Time.sleep() happen just before monitor # sleep. To fix that, either sleep here or increase time in a loop # to ensure that monitor wakes up at some point. # Try again but already at miniters = 1 so nothing will be done Time.fake_sleep(maxinterval) t.update(2) timeend = Time.time() while t.monitor.woken < timeend: Time.fake_sleep(1) # Force awake if it woken too soon # Wait for the monitor to get out of sleep's loop and update # tqdm assert t.miniters == 1 # check that monitor corrected miniters @patch_sleep def test_monitoring_multi(): """Test on multiple bars, one not needing miniters adjustment""" # Note: should fix miniters for these tests, else with dynamic_miniters # it's too complicated to handle with monitoring update and maxinterval... maxinterval = tqdm.monitor_interval assert maxinterval == 10 total = 1000 with closing(StringIO()) as our_file: with tqdm(total=total, file=our_file, miniters=500, mininterval=0.1, maxinterval=maxinterval) as t1: # Set high maxinterval for t2 so monitor does not need to adjust it with tqdm(total=total, file=our_file, miniters=500, mininterval=0.1, maxinterval=1E5) as t2: cpu_timify(t1, Time) cpu_timify(t2, Time) # Do a lot of iterations in a small timeframe Time.fake_sleep(maxinterval / 10) t1.update(500) t2.update(500) assert t1.miniters <= 500 # TODO: should really be == 500 assert t2.miniters == 500 # Then do 1 it after monitor interval, so that monitor kicks in Time.fake_sleep(maxinterval) t1.update(1) t2.update(1) # Wait for the monitor to get out of sleep and update tqdm timeend = Time.time() while not (t1.monitor.woken >= timeend and t1.miniters == 1): Time.fake_sleep(1) assert t1.miniters == 1 # check that monitor corrected miniters assert t2.miniters == 500 # check that t2 was not adjusted def test_imap(): """Test multiprocessing.Pool""" try: from multiprocessing import Pool except ImportError as err: skip(str(err)) pool = Pool() res = list(tqdm(pool.imap(incr, range(100)), disable=True)) pool.close() assert res[-1] == 100 @patch_lock(thread=True) def test_threadpool(): """Test concurrent.futures.ThreadPoolExecutor""" ThreadPoolExecutor = importorskip('concurrent.futures').ThreadPoolExecutor with ThreadPoolExecutor(8) as pool: res = list(tqdm(pool.map(incr_bar, range(100)), disable=True)) assert sum(res) == sum(range(1, 101))