tests_synchronisation.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. from functools import wraps
  2. from threading import Event
  3. from time import sleep, time
  4. from tqdm import TMonitor, tqdm, trange
  5. from .tests_tqdm import StringIO, closing, importorskip, patch_lock, skip
  6. class Time(object):
  7. """Fake time class class providing an offset"""
  8. offset = 0
  9. @classmethod
  10. def reset(cls):
  11. """zeroes internal offset"""
  12. cls.offset = 0
  13. @classmethod
  14. def time(cls):
  15. """time.time() + offset"""
  16. return time() + cls.offset
  17. @staticmethod
  18. def sleep(dur):
  19. """identical to time.sleep()"""
  20. sleep(dur)
  21. @classmethod
  22. def fake_sleep(cls, dur):
  23. """adds `dur` to internal offset"""
  24. cls.offset += dur
  25. sleep(0.000001) # sleep to allow interrupt (instead of pass)
  26. class FakeEvent(Event):
  27. """patched `threading.Event` where `wait()` uses `Time.fake_sleep()`"""
  28. def wait(self, timeout=None):
  29. """uses Time.fake_sleep"""
  30. if timeout is not None:
  31. Time.fake_sleep(timeout)
  32. return self.is_set()
  33. def patch_sleep(func):
  34. """Temporarily makes TMonitor use Time.fake_sleep"""
  35. @wraps(func)
  36. def inner(*args, **kwargs):
  37. """restores TMonitor on completion regardless of Exceptions"""
  38. TMonitor._test["time"] = Time.time
  39. TMonitor._test["Event"] = FakeEvent
  40. if tqdm.monitor:
  41. assert not tqdm.monitor.get_instances()
  42. tqdm.monitor.exit()
  43. del tqdm.monitor
  44. tqdm.monitor = None
  45. try:
  46. return func(*args, **kwargs)
  47. finally:
  48. # Check that class var monitor is deleted if no instance left
  49. tqdm.monitor_interval = 10
  50. if tqdm.monitor:
  51. assert not tqdm.monitor.get_instances()
  52. tqdm.monitor.exit()
  53. del tqdm.monitor
  54. tqdm.monitor = None
  55. TMonitor._test.pop("Event")
  56. TMonitor._test.pop("time")
  57. return inner
  58. def cpu_timify(t, timer=Time):
  59. """Force tqdm to use the specified timer instead of system-wide time"""
  60. t._time = timer.time
  61. t._sleep = timer.fake_sleep
  62. t.start_t = t.last_print_t = t._time()
  63. return timer
  64. class FakeTqdm(object):
  65. _instances = set()
  66. get_lock = tqdm.get_lock
  67. def incr(x):
  68. return x + 1
  69. def incr_bar(x):
  70. with closing(StringIO()) as our_file:
  71. for _ in trange(x, lock_args=(False,), file=our_file):
  72. pass
  73. return incr(x)
  74. @patch_sleep
  75. def test_monitor_thread():
  76. """Test dummy monitoring thread"""
  77. monitor = TMonitor(FakeTqdm, 10)
  78. # Test if alive, then killed
  79. assert monitor.report()
  80. monitor.exit()
  81. assert not monitor.report()
  82. assert not monitor.is_alive()
  83. del monitor
  84. @patch_sleep
  85. def test_monitoring_and_cleanup():
  86. """Test for stalled tqdm instance and monitor deletion"""
  87. # Note: should fix miniters for these tests, else with dynamic_miniters
  88. # it's too complicated to handle with monitoring update and maxinterval...
  89. maxinterval = tqdm.monitor_interval
  90. assert maxinterval == 10
  91. total = 1000
  92. with closing(StringIO()) as our_file:
  93. with tqdm(total=total, file=our_file, miniters=500, mininterval=0.1,
  94. maxinterval=maxinterval) as t:
  95. cpu_timify(t, Time)
  96. # Do a lot of iterations in a small timeframe
  97. # (smaller than monitor interval)
  98. Time.fake_sleep(maxinterval / 10) # monitor won't wake up
  99. t.update(500)
  100. # check that our fixed miniters is still there
  101. assert t.miniters <= 500 # TODO: should really be == 500
  102. # Then do 1 it after monitor interval, so that monitor kicks in
  103. Time.fake_sleep(maxinterval)
  104. t.update(1)
  105. # Wait for the monitor to get out of sleep's loop and update tqdm.
  106. timeend = Time.time()
  107. while not (t.monitor.woken >= timeend and t.miniters == 1):
  108. Time.fake_sleep(1) # Force awake up if it woken too soon
  109. assert t.miniters == 1 # check that monitor corrected miniters
  110. # Note: at this point, there may be a race condition: monitor saved
  111. # current woken time but Time.sleep() happen just before monitor
  112. # sleep. To fix that, either sleep here or increase time in a loop
  113. # to ensure that monitor wakes up at some point.
  114. # Try again but already at miniters = 1 so nothing will be done
  115. Time.fake_sleep(maxinterval)
  116. t.update(2)
  117. timeend = Time.time()
  118. while t.monitor.woken < timeend:
  119. Time.fake_sleep(1) # Force awake if it woken too soon
  120. # Wait for the monitor to get out of sleep's loop and update
  121. # tqdm
  122. assert t.miniters == 1 # check that monitor corrected miniters
  123. @patch_sleep
  124. def test_monitoring_multi():
  125. """Test on multiple bars, one not needing miniters adjustment"""
  126. # Note: should fix miniters for these tests, else with dynamic_miniters
  127. # it's too complicated to handle with monitoring update and maxinterval...
  128. maxinterval = tqdm.monitor_interval
  129. assert maxinterval == 10
  130. total = 1000
  131. with closing(StringIO()) as our_file:
  132. with tqdm(total=total, file=our_file, miniters=500, mininterval=0.1,
  133. maxinterval=maxinterval) as t1:
  134. # Set high maxinterval for t2 so monitor does not need to adjust it
  135. with tqdm(total=total, file=our_file, miniters=500, mininterval=0.1,
  136. maxinterval=1E5) as t2:
  137. cpu_timify(t1, Time)
  138. cpu_timify(t2, Time)
  139. # Do a lot of iterations in a small timeframe
  140. Time.fake_sleep(maxinterval / 10)
  141. t1.update(500)
  142. t2.update(500)
  143. assert t1.miniters <= 500 # TODO: should really be == 500
  144. assert t2.miniters == 500
  145. # Then do 1 it after monitor interval, so that monitor kicks in
  146. Time.fake_sleep(maxinterval)
  147. t1.update(1)
  148. t2.update(1)
  149. # Wait for the monitor to get out of sleep and update tqdm
  150. timeend = Time.time()
  151. while not (t1.monitor.woken >= timeend and t1.miniters == 1):
  152. Time.fake_sleep(1)
  153. assert t1.miniters == 1 # check that monitor corrected miniters
  154. assert t2.miniters == 500 # check that t2 was not adjusted
  155. def test_imap():
  156. """Test multiprocessing.Pool"""
  157. try:
  158. from multiprocessing import Pool
  159. except ImportError as err:
  160. skip(str(err))
  161. pool = Pool()
  162. res = list(tqdm(pool.imap(incr, range(100)), disable=True))
  163. pool.close()
  164. assert res[-1] == 100
  165. @patch_lock(thread=True)
  166. def test_threadpool():
  167. """Test concurrent.futures.ThreadPoolExecutor"""
  168. ThreadPoolExecutor = importorskip('concurrent.futures').ThreadPoolExecutor
  169. with ThreadPoolExecutor(8) as pool:
  170. res = list(tqdm(pool.map(incr_bar, range(100)), disable=True))
  171. assert sum(res) == sum(range(1, 101))