test_api.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481
  1. import json
  2. import os
  3. import pathlib
  4. import platform
  5. import shutil
  6. import sys
  7. import tarfile
  8. import time
  9. import zipfile
  10. from datetime import datetime
  11. from tempfile import TemporaryDirectory
  12. import pytest
  13. import conda_package_handling
  14. import conda_package_handling.tarball
  15. from conda_package_handling import api, exceptions
  16. this_dir = os.path.dirname(__file__)
  17. data_dir = os.path.join(this_dir, "data")
  18. version_file = pathlib.Path(this_dir).parent / "src" / "conda_package_handling" / "__init__.py"
  19. test_package_name = "mock-2.0.0-py37_1000"
  20. test_package_name_2 = "cph_test_data-0.0.1-0"
  21. @pytest.mark.skipif(
  22. bool(os.environ.get("GITHUB_ACTIONS", False)), reason="Fails on GitHub Actions"
  23. )
  24. @pytest.mark.skipif(not version_file.exists(), reason=f"Could not find {version_file}")
  25. def test_correct_version():
  26. """
  27. Prevent accidentally running tests against a globally installed different version.
  28. """
  29. assert conda_package_handling.__version__ in version_file.read_text()
  30. def test_api_extract_tarball_implicit_path(testing_workdir):
  31. tarfile = os.path.join(data_dir, test_package_name + ".tar.bz2")
  32. local_tarfile = os.path.join(testing_workdir, os.path.basename(tarfile))
  33. shutil.copy2(tarfile, local_tarfile)
  34. api.extract(local_tarfile)
  35. assert os.path.isfile(os.path.join(testing_workdir, test_package_name, "info", "index.json"))
  36. def test_api_tarball_details(testing_workdir):
  37. tarfile = os.path.join(data_dir, test_package_name + ".tar.bz2")
  38. results = api.get_pkg_details(tarfile)
  39. assert results["size"] == 106576
  40. assert results["md5"] == "0f9cce120a73803a70abb14bd4d4900b"
  41. assert results["sha256"] == "34c659b0fdc53d28ae721fd5717446fb8abebb1016794bd61e25937853f4c29c"
  42. def test_api_conda_v2_details(testing_workdir):
  43. condafile = os.path.join(data_dir, test_package_name + ".conda")
  44. results = api.get_pkg_details(condafile)
  45. assert results["size"] == 113421
  46. assert results["sha256"] == "181ec44eb7b06ebb833eae845bcc466ad96474be1f33ee55cab7ac1b0fdbbfa3"
  47. assert results["md5"] == "23c226430e35a3bd994db6c36b9ac8ae"
  48. def test_api_extract_tarball_explicit_path(testing_workdir):
  49. tarfile = os.path.join(data_dir, test_package_name + ".tar.bz2")
  50. local_tarfile = os.path.join(testing_workdir, os.path.basename(tarfile))
  51. shutil.copy2(tarfile, local_tarfile)
  52. api.extract(local_tarfile, "manual_path")
  53. assert os.path.isfile(os.path.join(testing_workdir, "manual_path", "info", "index.json"))
  54. def test_api_extract_conda_v2_implicit_path(testing_workdir):
  55. condafile = os.path.join(data_dir, test_package_name + ".conda")
  56. local_condafile = os.path.join(testing_workdir, os.path.basename(condafile))
  57. shutil.copy2(condafile, local_condafile)
  58. api.extract(local_condafile)
  59. assert os.path.isfile(os.path.join(testing_workdir, test_package_name, "info", "index.json"))
  60. def test_api_extract_conda_v2_no_destdir_relative_path(testing_workdir):
  61. cwd = os.getcwd()
  62. os.chdir(testing_workdir)
  63. try:
  64. condafile = os.path.join(data_dir, test_package_name + ".conda")
  65. local_condafile = os.path.join(testing_workdir, os.path.basename(condafile))
  66. shutil.copy2(condafile, local_condafile)
  67. condafile = os.path.basename(local_condafile)
  68. assert os.path.exists(condafile)
  69. # cli passes dest=None, prefix=None
  70. api.extract(condafile, None, prefix=None)
  71. finally:
  72. os.chdir(cwd)
  73. def test_api_extract_conda_v2_explicit_path(testing_workdir):
  74. condafile = os.path.join(data_dir, test_package_name + ".conda")
  75. local_condafile = os.path.join(testing_workdir, os.path.basename(condafile))
  76. shutil.copy2(condafile, local_condafile)
  77. api.extract(condafile, "manual_path")
  78. assert os.path.isfile(os.path.join(testing_workdir, "manual_path", "info", "index.json"))
  79. def test_api_extract_conda_v2_explicit_path_prefix(testing_workdir):
  80. tarfile = os.path.join(data_dir, test_package_name + ".conda")
  81. api.extract(tarfile, prefix=os.path.join(testing_workdir, "folder"))
  82. assert os.path.isfile(
  83. os.path.join(testing_workdir, "folder", test_package_name, "info", "index.json")
  84. )
  85. api.extract(tarfile, dest_dir="steve", prefix=os.path.join(testing_workdir, "folder"))
  86. assert os.path.isfile(os.path.join(testing_workdir, "folder", "steve", "info", "index.json"))
  87. def test_api_extract_dest_dir_and_prefix_both_abs_raises():
  88. tarfile = os.path.join(data_dir, test_package_name + ".conda")
  89. with pytest.raises(ValueError):
  90. api.extract(tarfile, prefix=os.path.dirname(tarfile), dest_dir=os.path.dirname(tarfile))
  91. def test_api_extract_info_conda_v2(testing_workdir):
  92. condafile = os.path.join(data_dir, test_package_name + ".conda")
  93. local_condafile = os.path.join(testing_workdir, os.path.basename(condafile))
  94. shutil.copy2(condafile, local_condafile)
  95. api.extract(local_condafile, "manual_path", components="info")
  96. assert os.path.isfile(os.path.join(testing_workdir, "manual_path", "info", "index.json"))
  97. assert not os.path.isdir(os.path.join(testing_workdir, "manual_path", "lib"))
  98. def check_conda_v2_metadata(condafile):
  99. with zipfile.ZipFile(condafile) as zf:
  100. d = json.loads(zf.read("metadata.json"))
  101. assert d["conda_pkg_format_version"] == 2
  102. def test_api_transmute_tarball_to_conda_v2(testing_workdir):
  103. tarfile = os.path.join(data_dir, test_package_name + ".tar.bz2")
  104. # lower compress level makes the test run much faster, even 15 is much
  105. # better than 22
  106. errors = api.transmute(tarfile, ".conda", testing_workdir, zstd_compress_level=3)
  107. assert not errors
  108. condafile = os.path.join(testing_workdir, test_package_name + ".conda")
  109. assert os.path.isfile(condafile)
  110. check_conda_v2_metadata(condafile)
  111. def test_api_transmute_tarball_info_sorts_first(testing_workdir):
  112. test_packages = [test_package_name]
  113. test_packages_with_symlinks = [test_package_name_2]
  114. if sys.platform != "win32":
  115. test_packages += test_packages_with_symlinks
  116. for test_package in test_packages:
  117. test_file = os.path.join(data_dir, test_package + ".tar.bz2")
  118. # transmute/convert doesn't re-sort files; extract to folder.
  119. api.extract(test_file, testing_workdir)
  120. out_fn = os.path.join(testing_workdir, test_package + ".tar.bz2")
  121. out = api.create(testing_workdir, None, out_fn)
  122. assert out == out_fn
  123. # info must be first
  124. with tarfile.open(out_fn, "r:bz2") as repacked:
  125. info_seen = False
  126. not_info_seen = False
  127. for member in repacked:
  128. if member.name.startswith("info"):
  129. assert (
  130. not_info_seen is False
  131. ), f"{test_package} package info/ must sort first, "
  132. f"but {[m.name for m in repacked.getmembers()]}"
  133. info_seen = True
  134. else:
  135. not_info_seen = True
  136. assert info_seen, "package had no info/ files"
  137. @pytest.mark.skipif(sys.platform == "win32", reason="windows and symlinks are not great")
  138. def test_api_transmute_to_conda_v2_contents(testing_workdir):
  139. def _walk(path):
  140. for entry in os.scandir(path):
  141. if entry.is_dir(follow_symlinks=False):
  142. yield from _walk(entry.path)
  143. continue
  144. yield entry
  145. tar_path = os.path.join(data_dir, test_package_name_2 + ".tar.bz2")
  146. conda_path = os.path.join(testing_workdir, test_package_name_2 + ".conda")
  147. api.transmute(tar_path, ".conda", testing_workdir, zstd_compress_level=3)
  148. # Verify original contents were all put in the right place
  149. pkg_tarbz2 = tarfile.open(tar_path, mode="r:bz2")
  150. info_items = [item for item in pkg_tarbz2.getmembers() if item.path.startswith("info/")]
  151. pkg_items = [item for item in pkg_tarbz2.getmembers() if not item.path.startswith("info/")]
  152. errors = []
  153. for component, expected in (("info", info_items), ("pkg", pkg_items)):
  154. with TemporaryDirectory() as root:
  155. api.extract(conda_path, root, components=component)
  156. contents = {
  157. os.path.relpath(entry.path, root): {
  158. "is_symlink": entry.is_symlink(),
  159. "target": os.readlink(entry.path) if entry.is_symlink() else None,
  160. }
  161. for entry in _walk(root)
  162. }
  163. for item in expected:
  164. if item.path not in contents:
  165. errors.append(f"'{item.path}' not found in {component} contents")
  166. continue
  167. ct = contents.pop(item.path)
  168. if item.issym():
  169. if not ct["is_symlink"] or ct["target"] != item.linkname:
  170. errors.append(
  171. f"{item.name} -> {item.linkname} incorrect in {component} contents"
  172. )
  173. elif not item.isfile():
  174. # Raise an exception rather than appending to `errors`
  175. # because getting to this point is an indication that our
  176. # test data (i.e., .tar.bz2 package) is corrupt, rather
  177. # than the `.transmute` function having problems (which is
  178. # what `errors` is meant to track). For context, conda
  179. # packages should only contain regular files and symlinks.
  180. raise ValueError(f"unexpected item '{item.path}' in test .tar.bz2")
  181. if contents:
  182. errors.append(f"extra files [{', '.join(contents)}] in {component} contents")
  183. assert not errors
  184. def test_api_transmute_conda_v2_to_tarball(testing_workdir):
  185. condafile = os.path.join(data_dir, test_package_name + ".conda")
  186. outfile = pathlib.Path(testing_workdir, test_package_name + ".tar.bz2")
  187. # one quiet=True in the test suite for coverage
  188. api.transmute(condafile, ".tar.bz2", testing_workdir, quiet=True)
  189. assert outfile.is_file()
  190. # test that no-force keeps file, and force overwrites file
  191. for force in False, True:
  192. mtime = outfile.stat().st_mtime
  193. time.sleep(2 if platform.platform() == "Windows" else 0)
  194. api.transmute(condafile, ".tar.bz2", testing_workdir, force=force)
  195. mtime2 = outfile.stat().st_mtime
  196. assert (mtime2 == mtime) != force
  197. def test_warning_when_bundling_no_metadata(testing_workdir):
  198. pass
  199. @pytest.mark.skipif(sys.platform == "win32", reason="windows and symlinks are not great")
  200. def test_create_package_with_uncommon_conditions_captures_all_content(testing_workdir):
  201. os.makedirs("src/a_folder")
  202. os.makedirs("src/empty_folder")
  203. os.makedirs("src/symlink_stuff")
  204. with open("src/a_folder/text_file", "w") as f:
  205. f.write("weee")
  206. open("src/empty_file", "w").close()
  207. os.link("src/a_folder/text_file", "src/a_folder/hardlink_to_text_file")
  208. os.symlink("../a_folder", "src/symlink_stuff/symlink_to_a")
  209. os.symlink("../empty_file", "src/symlink_stuff/symlink_to_empty_file")
  210. os.symlink("../a_folder/text_file", "src/symlink_stuff/symlink_to_text_file")
  211. with tarfile.open("pinkie.tar.bz2", "w:bz2") as tf:
  212. def add(source, target):
  213. tf.add(source, target, recursive=False)
  214. add("src/empty_folder", "empty_folder")
  215. add("src/empty_file", "empty_file")
  216. add("src/a_folder", "a_folder")
  217. add("src/a_folder/text_file", "a_folder/text_file")
  218. add("src/a_folder/hardlink_to_text_file", "a_folder/hardlink_to_text_file")
  219. add("src/symlink_stuff/symlink_to_a", "symlink_stuff/symlink_to_a")
  220. add(
  221. "src/symlink_stuff/symlink_to_empty_file",
  222. "symlink_stuff/symlink_to_empty_file",
  223. )
  224. add(
  225. "src/symlink_stuff/symlink_to_text_file",
  226. "symlink_stuff/symlink_to_text_file",
  227. )
  228. api.create("src", None, "thebrain.tar.bz2")
  229. # test against both archives created manually and those created by cph.
  230. # They should be equal in all ways.
  231. for fn in ("pinkie.tar.bz2", "thebrain.tar.bz2"):
  232. api.extract(fn)
  233. target_dir = fn[:-8]
  234. flist = [
  235. "empty_folder",
  236. "empty_file",
  237. "a_folder/text_file",
  238. "a_folder/hardlink_to_text_file",
  239. "symlink_stuff/symlink_to_a",
  240. "symlink_stuff/symlink_to_text_file",
  241. "symlink_stuff/symlink_to_empty_file",
  242. ]
  243. # no symlinks on windows
  244. if sys.platform != "win32":
  245. # not directly included but checked symlink
  246. flist.append("symlink_stuff/symlink_to_a/text_file")
  247. missing_content = []
  248. for f in flist:
  249. path_that_should_be_there = os.path.join(testing_workdir, target_dir, f)
  250. if not (
  251. os.path.exists(path_that_should_be_there)
  252. or os.path.lexists(path_that_should_be_there) # noqa
  253. ):
  254. missing_content.append(f)
  255. if missing_content:
  256. print("missing files in output package")
  257. print(missing_content)
  258. sys.exit(1)
  259. # hardlinks should be preserved, but they're currently not with libarchive
  260. # hardlinked_file = os.path.join(testing_workdir, target_dir, 'a_folder/text_file')
  261. # stat = os.stat(hardlinked_file)
  262. # assert stat.st_nlink == 2
  263. hardlinked_file = os.path.join(testing_workdir, target_dir, "empty_file")
  264. stat = os.stat(hardlinked_file)
  265. if sys.platform != "win32":
  266. assert stat.st_nlink == 1
  267. @pytest.mark.skipif(
  268. datetime.now() <= datetime(2020, 12, 1),
  269. reason="Don't understand why this doesn't behave. Punt.",
  270. )
  271. def test_secure_refusal_to_extract_abs_paths(testing_workdir):
  272. with tarfile.open("pinkie.tar.bz2", "w:bz2") as tf:
  273. open("thebrain", "w").close()
  274. tf.add(os.path.join(testing_workdir, "thebrain"), "/naughty/abs_path")
  275. try:
  276. tf.getmember("/naughty/abs_path")
  277. except KeyError:
  278. pytest.skip("Tar implementation does not generate unsafe paths in archive.")
  279. with pytest.raises(api.InvalidArchiveError):
  280. api.extract("pinkie.tar.bz2")
  281. def tests_secure_refusal_to_extract_dotdot(testing_workdir):
  282. with tarfile.open("pinkie.tar.bz2", "w:bz2") as tf:
  283. open("thebrain", "w").close()
  284. tf.add(os.path.join(testing_workdir, "thebrain"), "../naughty/abs_path")
  285. with pytest.raises(api.InvalidArchiveError):
  286. api.extract("pinkie.tar.bz2")
  287. def test_api_bad_filename(testing_workdir):
  288. with pytest.raises(ValueError):
  289. api.extract("pinkie.rar", testing_workdir)
  290. def test_details_bad_extension():
  291. with pytest.raises(ValueError):
  292. # TODO this function should not exist
  293. api.get_pkg_details("pinkie.rar")
  294. def test_convert_bad_extension(testing_workdir):
  295. api._convert("pinkie.rar", ".conda", testing_workdir)
  296. def test_convert_keyerror(tmpdir, mocker):
  297. tarfile = os.path.join(data_dir, test_package_name + ".tar.bz2")
  298. mocker.patch(
  299. "conda_package_streaming.transmute.transmute",
  300. side_effect=KeyboardInterrupt(),
  301. )
  302. # interrupted before ".conda" was created
  303. with pytest.raises(KeyboardInterrupt):
  304. api._convert(tarfile, ".conda", tmpdir)
  305. def create_file_and_raise(*args, **kwargs):
  306. out_fn = pathlib.Path(tmpdir, pathlib.Path(tarfile[: -len(".tar.bz2")] + ".conda").name)
  307. print("out fn", out_fn)
  308. out_fn.write_text("")
  309. raise KeyboardInterrupt()
  310. mocker.patch("conda_package_streaming.transmute.transmute", side_effect=create_file_and_raise)
  311. # interrupted after ".conda" was created
  312. with pytest.raises(KeyboardInterrupt):
  313. api._convert(tarfile, ".conda", tmpdir)
  314. def test_create_filelist(tmpdir, mocker):
  315. # another bad API, tested for coverage
  316. filelist = pathlib.Path(tmpdir, "filelist.txt")
  317. filelist.write_text("\n".join(["filelist.txt", "anotherfile"]))
  318. # when looking for filelist-not-found.txt
  319. with pytest.raises(FileNotFoundError):
  320. api.create(str(tmpdir), "filelist-not-found.txt", str(tmpdir / "newconda.conda"))
  321. # when adding anotherfile
  322. with pytest.raises(FileNotFoundError):
  323. api.create(str(tmpdir), str(filelist), str(tmpdir / "newconda.conda"))
  324. # unrecognized target extension
  325. with pytest.raises(ValueError):
  326. api.create(str(tmpdir), str(filelist), str(tmpdir / "newpackage.rar"))
  327. def create_file_and_raise(prefix, file_list, out_fn, *args, **kwargs):
  328. pathlib.Path(prefix, out_fn).write_text("")
  329. raise KeyboardInterrupt()
  330. mocker.patch(
  331. "conda_package_handling.conda_fmt.CondaFormat_v2.create",
  332. side_effect=create_file_and_raise,
  333. )
  334. # failure inside inner create()
  335. with pytest.raises(KeyboardInterrupt):
  336. api.create(str(tmpdir), str(filelist), str(tmpdir / "newpackage.conda"))
  337. def test_api_transmute_fail_validation(tmpdir, mocker):
  338. package = os.path.join(data_dir, test_package_name + ".conda")
  339. # this code is only called for .conda -> .tar.bz2; a streaming validate for
  340. # .tar.bz2 -> .conda would be a good idea.
  341. mocker.patch(
  342. "conda_package_handling.validate.validate_converted_files_match_streaming",
  343. return_value=(str(package), {"missing-file.txt"}, {"mismatched-size.txt"}),
  344. )
  345. errors = api.transmute(package, ".tar.bz2", tmpdir)
  346. assert errors
  347. def test_api_transmute_fail_validation_to_conda(tmpdir, mocker):
  348. package = os.path.join(data_dir, test_package_name + ".tar.bz2")
  349. mocker.patch(
  350. "conda_package_handling.validate.validate_converted_files_match_streaming",
  351. return_value=(str(package), {"missing-file.txt"}, {"mismatched-size.txt"}),
  352. )
  353. errors = api.transmute(package, ".conda", tmpdir, zstd_compress_level=3)
  354. assert errors
  355. def test_api_transmute_fail_validation_2(tmpdir, mocker):
  356. package = os.path.join(data_dir, test_package_name + ".conda")
  357. tmptarfile = tmpdir / pathlib.Path(package).name
  358. shutil.copy(package, tmptarfile)
  359. mocker.patch(
  360. "conda_package_handling.validate.validate_converted_files_match_streaming",
  361. side_effect=Exception("not today"),
  362. )
  363. # run with out_folder=None
  364. errors = api.transmute(str(tmptarfile), ".tar.bz2")
  365. assert errors
  366. def test_api_translates_exception(mocker, tmpdir):
  367. from conda_package_streaming.extract import exceptions as cps_exceptions
  368. tarfile = os.path.join(data_dir, test_package_name + ".tar.bz2")
  369. # translates their exception to our exception of the same name
  370. mocker.patch(
  371. "conda_package_streaming.package_streaming.stream_conda_component",
  372. side_effect=cps_exceptions.CaseInsensitiveFileSystemError(),
  373. )
  374. # should this be exported from the api or inherit from InvalidArchiveError?
  375. with pytest.raises(exceptions.CaseInsensitiveFileSystemError):
  376. api.extract(tarfile, tmpdir)