test_iterutils.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553
  1. import os
  2. import pytest
  3. from boltons.dictutils import OMD
  4. from boltons.iterutils import (first,
  5. remap,
  6. research,
  7. default_enter,
  8. default_exit,
  9. get_path)
  10. from boltons.namedutils import namedtuple
  11. CUR_PATH = os.path.abspath(__file__)
  12. isbool = lambda x: isinstance(x, bool)
  13. isint = lambda x: isinstance(x, int)
  14. odd = lambda x: isint(x) and x % 2 != 0
  15. even = lambda x: isint(x) and x % 2 == 0
  16. is_meaning_of_life = lambda x: x == 42
  17. class TestFirst(object):
  18. def test_empty_iterables(self):
  19. """
  20. Empty iterables return None.
  21. """
  22. s = set()
  23. l = []
  24. assert first(s) is None
  25. assert first(l) is None
  26. def test_default_value(self):
  27. """
  28. Empty iterables + a default value return the default value.
  29. """
  30. s = set()
  31. l = []
  32. assert first(s, default=42) == 42
  33. assert first(l, default=3.14) == 3.14
  34. l = [0, False, []]
  35. assert first(l, default=3.14) == 3.14
  36. def test_selection(self):
  37. """
  38. Success cases with and without a key function.
  39. """
  40. l = [(), 0, False, 3, []]
  41. assert first(l, default=42) == 3
  42. assert first(l, key=isint) == 0
  43. assert first(l, key=isbool) is False
  44. assert first(l, key=odd) == 3
  45. assert first(l, key=even) == 0
  46. assert first(l, key=is_meaning_of_life) is None
  47. class TestRemap(object):
  48. # TODO: test namedtuples and other immutable containers
  49. def test_basic_clone(self):
  50. orig = {"a": "b", "c": [1, 2]}
  51. assert orig == remap(orig)
  52. orig2 = [{1: 2}, {"a": "b", "c": [1, 2, {"cat": "dog"}]}]
  53. assert orig2 == remap(orig2)
  54. def test_empty(self):
  55. assert [] == remap([])
  56. assert {} == remap({})
  57. assert set() == remap(set())
  58. def test_unremappable(self):
  59. obj = object()
  60. with pytest.raises(TypeError):
  61. remap(obj)
  62. def test_basic_upper(self):
  63. orig = {'a': 1, 'b': object(), 'c': {'d': set()}}
  64. remapped = remap(orig, lambda p, k, v: (k.upper(), v))
  65. assert orig['a'] == remapped['A']
  66. assert orig['b'] == remapped['B']
  67. assert orig['c']['d'] == remapped['C']['D']
  68. def test_item_drop(self):
  69. orig = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
  70. even_items = remap(orig, lambda p, k, v: not (v % 2))
  71. assert even_items == [0, 2, 4, 6, 8]
  72. def test_noncallables(self):
  73. with pytest.raises(TypeError):
  74. remap([], visit='test')
  75. with pytest.raises(TypeError):
  76. remap([], enter='test')
  77. with pytest.raises(TypeError):
  78. remap([], exit='test')
  79. def test_sub_selfref(self):
  80. coll = [0, 1, 2, 3]
  81. sub = []
  82. sub.append(sub)
  83. coll.append(sub)
  84. with pytest.raises(RuntimeError):
  85. # if equal, should recurse infinitely
  86. assert coll == remap(coll)
  87. def test_root_selfref(self):
  88. selfref = [0, 1, 2, 3]
  89. selfref.append(selfref)
  90. with pytest.raises(RuntimeError):
  91. assert selfref == remap(selfref)
  92. selfref2 = {}
  93. selfref2['self'] = selfref2
  94. with pytest.raises(RuntimeError):
  95. assert selfref2 == remap(selfref2)
  96. def test_duperef(self):
  97. val = ['hello']
  98. duperef = [val, val]
  99. remapped = remap(duperef)
  100. assert remapped[0] is remapped[1]
  101. assert remapped[0] is not duperef[0]
  102. def test_namedtuple(self):
  103. """TODO: this fails right now because namedtuples' __new__ is
  104. overridden to accept arguments. remap's default_enter tries
  105. to create an empty namedtuple and gets a TypeError.
  106. Could make it so that immutable types actually don't create a
  107. blank new parent and instead use the old_parent as a
  108. placeholder, creating a new one at exit-time from the value's
  109. __class__ (how default_exit works now). But even then it would
  110. have to *args in the values, as namedtuple constructors don't
  111. take an iterable.
  112. """
  113. Point = namedtuple('Point', 'x y')
  114. point_map = {'origin': [Point(0, 0)]}
  115. with pytest.raises(TypeError):
  116. remapped = remap(point_map)
  117. assert isinstance(remapped['origin'][0], Point)
  118. def test_path(self):
  119. path_map = {}
  120. # test visit's path
  121. target_str = 'test'
  122. orig = [[[target_str]]]
  123. ref_path = (0, 0, 0)
  124. def visit(path, key, value):
  125. if value is target_str:
  126. path_map['target_str'] = path + (key,)
  127. return key, value
  128. remapped = remap(orig, visit=visit)
  129. assert remapped == orig
  130. assert path_map['target_str'] == ref_path
  131. # test enter's path
  132. target_obj = object()
  133. orig = {'a': {'b': {'c': {'d': ['e', target_obj, 'f']}}}}
  134. ref_path = ('a', 'b', 'c', 'd', 1)
  135. def enter(path, key, value):
  136. if value is target_obj:
  137. path_map['target_obj'] = path + (key,)
  138. return default_enter(path, key, value)
  139. remapped = remap(orig, enter=enter)
  140. assert remapped == orig
  141. assert path_map['target_obj'] == ref_path
  142. # test exit's path
  143. target_set = frozenset([1, 7, 3, 8])
  144. orig = [0, 1, 2, [3, 4, [5, target_set]]]
  145. ref_path = (3, 2, 1)
  146. def exit(path, key, old_parent, new_parent, new_items):
  147. if old_parent is target_set:
  148. path_map['target_set'] = path + (key,)
  149. return default_exit(path, key, old_parent, new_parent, new_items)
  150. remapped = remap(orig, exit=exit)
  151. assert remapped == orig
  152. assert path_map['target_set'] == ref_path
  153. def test_reraise_visit(self):
  154. root = {'A': 'b', 1: 2}
  155. key_to_lower = lambda p, k, v: (k.lower(), v)
  156. with pytest.raises(AttributeError):
  157. remap(root, key_to_lower)
  158. remapped = remap(root, key_to_lower, reraise_visit=False)
  159. assert remapped['a'] == 'b'
  160. assert remapped[1] == 2
  161. def test_drop_nones(self):
  162. orig = {'a': 1, 'b': None, 'c': [3, None, 4, None]}
  163. ref = {'a': 1, 'c': [3, 4]}
  164. drop_none = lambda p, k, v: v is not None
  165. remapped = remap(orig, visit=drop_none)
  166. assert remapped == ref
  167. orig = [None] * 100
  168. remapped = remap(orig, drop_none)
  169. assert not remapped
  170. def test_dict_to_omd(self):
  171. def enter(path, key, value):
  172. if isinstance(value, dict):
  173. return OMD(), sorted(value.items())
  174. return default_enter(path, key, value)
  175. orig = [{'title': 'Wild Palms',
  176. 'ratings': {1: 1, 2: 3, 3: 5, 4: 6, 5: 3}},
  177. {'title': 'Twin Peaks',
  178. 'ratings': {1: 3, 2: 2, 3: 8, 4: 12, 5: 15}}]
  179. remapped = remap(orig, enter=enter)
  180. assert remapped == orig
  181. assert isinstance(remapped[0], OMD)
  182. assert isinstance(remapped[0]['ratings'], OMD)
  183. assert isinstance(remapped[1], OMD)
  184. assert isinstance(remapped[1]['ratings'], OMD)
  185. def test_sort_all_lists(self):
  186. def exit(path, key, old_parent, new_parent, new_items):
  187. # NB: in this case, I'd normally use *a, **kw
  188. ret = default_exit(path, key, old_parent, new_parent, new_items)
  189. if isinstance(ret, list):
  190. ret.sort()
  191. return ret
  192. # NB: Airplane model numbers (Boeing and Airbus)
  193. orig = [[[7, 0, 7],
  194. [7, 2, 7],
  195. [7, 7, 7],
  196. [7, 3, 7]],
  197. [[3, 8, 0],
  198. [3, 2, 0],
  199. [3, 1, 9],
  200. [3, 5, 0]]]
  201. ref = [[[0, 2, 3],
  202. [0, 3, 5],
  203. [0, 3, 8],
  204. [1, 3, 9]],
  205. [[0, 7, 7],
  206. [2, 7, 7],
  207. [3, 7, 7],
  208. [7, 7, 7]]]
  209. remapped = remap(orig, exit=exit)
  210. assert remapped == ref
  211. def test_collector_pattern(self):
  212. all_interests = set()
  213. def enter(path, key, value):
  214. try:
  215. all_interests.update(value['interests'])
  216. except:
  217. pass
  218. return default_enter(path, key, value)
  219. orig = [{'name': 'Kate',
  220. 'interests': ['theater', 'manga'],
  221. 'dads': [{'name': 'Chris',
  222. 'interests': ['biking', 'python']}]},
  223. {'name': 'Avery',
  224. 'interests': ['museums', 'pears'],
  225. 'dads': [{'name': 'Kurt',
  226. 'interests': ['python', 'recursion']}]}]
  227. ref = set(['python', 'recursion', 'biking', 'museums',
  228. 'pears', 'theater', 'manga'])
  229. remap(orig, enter=enter)
  230. assert all_interests == ref
  231. def test_add_length(self):
  232. def exit(path, key, old_parent, new_parent, new_items):
  233. ret = default_exit(path, key, old_parent, new_parent, new_items)
  234. try:
  235. ret['review_length'] = len(ret['review'])
  236. except:
  237. pass
  238. return ret
  239. orig = {'Star Trek':
  240. {'TNG': {'stars': 10,
  241. 'review': "Episodic AND deep. <3 Data."},
  242. 'DS9': {'stars': 8.5,
  243. 'review': "Like TNG, but with a story and no Data."},
  244. 'ENT': {'stars': None,
  245. 'review': "Can't review what you can't watch."}},
  246. 'Babylon 5': {'stars': 6,
  247. 'review': "Sophomoric, like a bitter laugh."},
  248. 'Dr. Who': {'stars': None,
  249. 'review': "800 episodes is too many to review."}}
  250. remapped = remap(orig, exit=exit)
  251. assert (remapped['Star Trek']['TNG']['review_length']
  252. < remapped['Star Trek']['DS9']['review_length'])
  253. def test_prepop(self):
  254. """Demonstrating normalization and ID addition through prepopulating
  255. the objects with an enter callback.
  256. """
  257. base_obj = {'name': None,
  258. 'rank': None,
  259. 'id': 1}
  260. def enter(path, key, value):
  261. new_parent, new_items = default_enter(path, key, value)
  262. try:
  263. new_parent.update(base_obj)
  264. base_obj['id'] += 1
  265. except:
  266. pass
  267. return new_parent, new_items
  268. orig = [{'name': 'Firefox', 'rank': 1},
  269. {'name': 'Chrome', 'rank': 2},
  270. {'name': 'IE'}]
  271. ref = [{'name': 'Firefox', 'rank': 1, 'id': 1},
  272. {'name': 'Chrome', 'rank': 2, 'id': 2},
  273. {'name': 'IE', 'rank': None, 'id': 3}]
  274. remapped = remap(orig, enter=enter)
  275. assert remapped == ref
  276. def test_remap_set(self):
  277. # explicit test for sets to make sure #84 is covered
  278. s = set([1, 2, 3])
  279. assert remap(s) == s
  280. fs = frozenset([1, 2, 3])
  281. assert remap(fs) == fs
  282. def test_remap_file(self):
  283. with open(CUR_PATH, 'rb') as f:
  284. x = {'a': [1, 2, 3], 'f': [f]}
  285. assert remap(x) == x
  286. f.read()
  287. assert remap(x) == x
  288. f.close() # see #146
  289. assert remap(x) == x
  290. return
  291. class TestGetPath(object):
  292. def test_depth_one(self):
  293. root = ['test']
  294. assert get_path(root, (0,)) == 'test'
  295. assert get_path(root, '0') == 'test'
  296. root = {'key': 'value'}
  297. assert get_path(root, ('key',)) == 'value'
  298. assert get_path(root, 'key') == 'value'
  299. def test_depth_two(self):
  300. root = {'key': ['test']}
  301. assert get_path(root, ('key', 0)) == 'test'
  302. assert get_path(root, 'key.0') == 'test'
  303. def test_research():
  304. root = {}
  305. with pytest.raises(TypeError):
  306. research(root, query=None)
  307. root = {'a': 'a'}
  308. res = research(root, query=lambda p, k, v: v == 'a')
  309. assert len(res) == 1
  310. assert res[0] == (('a',), 'a')
  311. def broken_query(p, k, v):
  312. raise RuntimeError()
  313. with pytest.raises(RuntimeError):
  314. research(root, broken_query, reraise=True)
  315. # empty results with default, reraise=False
  316. assert research(root, broken_query) == []
  317. def test_backoff_basic():
  318. from boltons.iterutils import backoff
  319. assert backoff(1, 16) == [1.0, 2.0, 4.0, 8.0, 16.0]
  320. assert backoff(1, 1) == [1.0]
  321. assert backoff(2, 15) == [2.0, 4.0, 8.0, 15.0]
  322. def test_backoff_repeat():
  323. from boltons.iterutils import backoff_iter
  324. fives = []
  325. for val in backoff_iter(5, 5, count='repeat'):
  326. fives.append(val)
  327. if len(fives) >= 1000:
  328. break
  329. assert fives == [5] * 1000
  330. def test_backoff_zero_start():
  331. from boltons.iterutils import backoff
  332. assert backoff(0, 16) == [0.0, 1.0, 2.0, 4.0, 8.0, 16.0]
  333. assert backoff(0, 15) == [0.0, 1.0, 2.0, 4.0, 8.0, 15.0]
  334. slow_backoff = [round(x, 2) for x in backoff(0, 2.9, factor=1.2)]
  335. assert slow_backoff == [0.0, 1.0, 1.2, 1.44, 1.73, 2.07, 2.49, 2.9]
  336. def test_backoff_validation():
  337. from boltons.iterutils import backoff
  338. with pytest.raises(ValueError):
  339. backoff(8, 2)
  340. with pytest.raises(ValueError):
  341. backoff(1, 0)
  342. with pytest.raises(ValueError):
  343. backoff(-1, 10)
  344. with pytest.raises(ValueError):
  345. backoff(2, 8, factor=0)
  346. with pytest.raises(ValueError):
  347. backoff(2, 8, jitter=20)
  348. def test_backoff_jitter():
  349. from boltons.iterutils import backoff
  350. start, stop = 1, 256
  351. unjittered = backoff(start, stop)
  352. jittered = backoff(start, stop, jitter=True)
  353. assert len(unjittered) == len(jittered)
  354. assert [u >= j for u, j in zip(unjittered, jittered)]
  355. neg_jittered = backoff(start, stop, jitter=-0.01)
  356. assert len(unjittered) == len(neg_jittered)
  357. assert [u <= j for u, j in zip(unjittered, neg_jittered)]
  358. o_jittered = backoff(start, stop, jitter=-0.0)
  359. assert len(unjittered) == len(o_jittered)
  360. assert [u == j for u, j in zip(unjittered, o_jittered)]
  361. nonconst_jittered = backoff(stop, stop, count=5, jitter=True)
  362. assert len(nonconst_jittered) == 5
  363. # no two should be equal realistically
  364. assert len(set(nonconst_jittered)) == 5
  365. def test_guiderator():
  366. import string
  367. from boltons.iterutils import GUIDerator
  368. guid_iter = GUIDerator()
  369. guid = next(guid_iter)
  370. assert guid
  371. assert len(guid) == guid_iter.size
  372. assert all([c in string.hexdigits for c in guid])
  373. guid2 = next(guid_iter)
  374. assert guid != guid2
  375. # custom size
  376. guid_iter = GUIDerator(size=26)
  377. assert len(next(guid_iter)) == 26
  378. def test_seqguiderator():
  379. import string
  380. from boltons.iterutils import SequentialGUIDerator as GUIDerator
  381. guid_iter = GUIDerator()
  382. guid = next(guid_iter)
  383. assert guid
  384. assert len(guid) == guid_iter.size
  385. assert all([c in string.hexdigits for c in guid])
  386. guid2 = next(guid_iter)
  387. assert guid != guid2
  388. # custom size
  389. for x in range(10000):
  390. guid_iter = GUIDerator(size=26)
  391. assert len(next(guid_iter)) == 26
  392. def test_chunked_bytes():
  393. # see #231
  394. from boltons.iterutils import chunked
  395. assert chunked(b'123', 2) in (['12', '3'], [b'12', b'3'])
  396. def test_chunk_ranges():
  397. from boltons.iterutils import chunk_ranges
  398. assert list(chunk_ranges(input_offset=10, input_size=10, chunk_size=5)) == [(10, 15), (15, 20)]
  399. assert list(chunk_ranges(input_offset=10, input_size=10, chunk_size=5, overlap_size=1)) == [(10, 15), (14, 19), (18, 20)]
  400. assert list(chunk_ranges(input_offset=10, input_size=10, chunk_size=5, overlap_size=2)) == [(10, 15), (13, 18), (16, 20)]
  401. assert list(chunk_ranges(input_offset=4, input_size=15, chunk_size=5, align=False)) == [(4, 9), (9, 14), (14, 19)]
  402. assert list(chunk_ranges(input_offset=4, input_size=15, chunk_size=5, align=True)) == [(4, 5), (5, 10), (10, 15), (15, 19)]
  403. assert list(chunk_ranges(input_offset=2, input_size=15, chunk_size=5, overlap_size=1, align=False)) == [(2, 7), (6, 11), (10, 15), (14, 17)]
  404. assert list(chunk_ranges(input_offset=2, input_size=15, chunk_size=5, overlap_size=1, align=True)) == [(2, 5), (4, 9), (8, 13), (12, 17)]
  405. assert list(chunk_ranges(input_offset=3, input_size=15, chunk_size=5, overlap_size=1, align=True)) == [(3, 5), (4, 9), (8, 13), (12, 17), (16, 18)]
  406. assert list(chunk_ranges(input_offset=3, input_size=2, chunk_size=5, overlap_size=1, align=True)) == [(3, 5)]
  407. def test_lstrip():
  408. from boltons.iterutils import lstrip
  409. assert lstrip([0,1,0,2,0,3,0],0) == [1,0,2,0,3,0]
  410. assert lstrip([0,0,0,1,0,2,0,3,0],0) == [1,0,2,0,3,0]
  411. assert lstrip([]) == []
  412. def test_rstrip():
  413. from boltons.iterutils import rstrip
  414. assert rstrip([0,1,0,2,0,3,0],0) == [0,1,0,2,0,3]
  415. assert rstrip([0,1,0,2,0,3,0,0,0],0) == [0,1,0,2,0,3]
  416. assert rstrip([]) == []
  417. def test_strip():
  418. from boltons.iterutils import strip
  419. assert strip([0,1,0,2,0,3,0],0) == [1,0,2,0,3]
  420. assert strip([0,0,0,1,0,2,0,3,0,0,0],0) == [1,0,2,0,3]
  421. assert strip([]) == []