Source code for mendevi.download.decompress
"""Decompress a file."""
import logging
import lzma
import pathlib
import re
import subprocess
import tqdm
from context_verbose import Printer
def _get_size(compressed_file: pathlib.Path) -> int:
"""Try to get the decompressed size with 'xz'."""
# this method should be improved
try:
out = subprocess.run(
["xz", "--robot", "--list", str(compressed_file)], check=True, capture_output=True,
)
except FileNotFoundError as err:
logging.getLogger(__name__).warning("please install xz %s", err)
sizes = re.findall(br"\d+", out.stdout)
return max(map(int, sizes), default=0)
[docs]
def decompress(compressed_file: pathlib.Path) -> pathlib.Path:
"""Decompress a *.xz file in the same folder."""
assert isinstance(compressed_file, pathlib.Path), compressed_file.__class__.__name__
assert compressed_file.suffix == ".xz", compressed_file
decompressed_file = compressed_file.parent / compressed_file.stem
# shortcut
size = None
if decompressed_file.exists():
size = _get_size(compressed_file)
if decompressed_file.stat().st_size == size:
return decompressed_file
# decompress
with Printer(f"Decompress {compressed_file.name!r}...", color="green") as prt:
if size is None:
size = _get_size(compressed_file)
with (
tqdm.tqdm(
dynamic_ncols=True,
leave=True,
smoothing=1e-6,
total=round(size*1e-6, 1),
unit="Mo",
) as load,
lzma.open(compressed_file, "r") as src,
decompressed_file.open("wb") as dst,
):
while data := src.read(1_000_000):
dst.write(data)
load.total = max(load.total, load.n + len(data)*1e-6)
load.update(len(data)*1e-6)
prt.print_time()
return decompressed_file