-
Notifications
You must be signed in to change notification settings - Fork 3
Description
Xz is pretty slow compared to other compression formats. It would be really cool if python-xz could be parallelized such that it prefetches the next blocks and decodes them in parallel. I think this would be a helpful feature and unique selling point for python-xz. I don't think there is a parallelized XZ decoder for Python at all, or is there?
I'm doing something similar in indexed_bzip2. But, I am aware that this adds complexity and problems:
- It probably won't mesh well with write and read-write opened files resulting in an obnoxious special case for read-opened files. Then again,
xzcan compress in parallel, so maybe that could also be possible. - How to handle bounded memory? It can't decode blocks in parallel if the decompressed results don't fit in memory. But, the decompressed block sizes are known and therefore could be used to limit the parallelism. One edge case would be one or multiple blocks that don't fit into memory not even alone. The easy workaround would be to fall back to a serial implementation but a more sophisticated solution should then be able to handle partial block reads inside the parallel decoder framework.
- When using multiprocessing as opposed to multithreading, there might be problems with opening the file multiple times, e.g., on Windows. Also, file objects generally cannot be reopened. But pickling file objects to other processes also isn't possible or could introduce race conditions.
I implemented a very rudimentary sketch on top of python-xz using multiprocessing.pool.Pool. It has the same design as indexed_bzip2, which is:
- A least-recently-used block cache containing futures to the decompressed block contents.
- A prefetcher inside the read method, which tracks the last accessed blocks and adds the next n blocks to the block cache if it detected sequential access.
- The read method will check the block cache and/or submit new blocks for decoding if necessary and returns the concatenated block results.
With this, I was able to speed up the decompression of a 3.1GiB xz file (decompressed 4GiB) consisting of 171 blocks by factor ~7 on an 8-core CPU (16 virtual cores):
- serial: Reading 4294967296 B took: 187.482s
- parallel: Reading 4294967296 B took: 26.890s
Hower, at this point I'm becoming uncertain whether this might be easier to implement inside python-xz itself or whether the wrapper is a sufficient ad-hoc solution. It only uses public methods and members of XZFile, so it should be stable during non-major version changes.
Rudimentary unfinished sketch / proof of work:
decompress-xz-parallel.py
Click to expand
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import bisect
import io
import lzma
import math
import multiprocessing.pool
import os
import resource
import sys
import time
from typing import Iterable
import xz
from parallel_xz_decoder import ParallelXZReader
def benchmark_python_xz_serial(filename):
print("== Benchmark serial xz file decompression ==")
size = 0
t0 = time.time()
with xz.open(filename, 'rb') as file:
t1 = time.time()
while True:
readSize = len(file.read(32 * 1024 * 1024))
if readSize == 0:
break
size += readSize
if time.time() - t1 > 5:
t1 = time.time()
print(f"{t1 - t0:.2f}s {resource.getrusage(resource.RUSAGE_SELF).ru_maxrss // 1024} MiB RSS")
file.close()
t1 = time.time()
print(f"Reading {size} B took: {t1-t0:.3f}s")
def test_python_xz_parallel(filename):
print("== Test parallel xz file decompression ==")
size = 0
t0 = time.time()
with xz.open(filename, 'rb') as file, ParallelXZReader(filename, os.cpu_count()) as pfile:
t1 = time.time()
while True:
readData = file.read(8 * 1024 * 1024)
parallelReadData = pfile.read(len(readData))
print("Read from:", file, pfile)
if readData != parallelReadData:
print("inequal", len(readData), len(parallelReadData))
assert readData == parallelReadData
readSize = len(readData)
if readSize == 0:
break
size += readSize
if time.time() - t1 > 5:
t1 = time.time()
print(f"{t1 - t0:.2f}s {resource.getrusage(resource.RUSAGE_SELF).ru_maxrss // 1024} MiB RSS")
file.close()
t1 = time.time()
print(f"Reading {size} B took: {t1-t0:.3f}s")
def benchmark_python_xz_parallel(filename):
print("== Benchmark parallel xz file decompression ==")
size = 0
t0 = time.time()
with ParallelXZReader(filename, os.cpu_count()) as file:
t1 = time.time()
while True:
readSize = len(file.read(8 * 1024 * 1024))
if readSize == 0:
break
size += readSize
if time.time() - t1 > 5:
t1 = time.time()
print(f"{t1 - t0:.2f}s {resource.getrusage(resource.RUSAGE_SELF).ru_maxrss // 1024} MiB RSS")
file.close()
t1 = time.time()
print(f"Reading {size} B took: {t1-t0:.3f}s")
if __name__ == '__main__':
print("xz version:", xz.__version__)
filename = sys.argv[1]
benchmark_python_xz_serial(filename)
test_python_xz_parallel(filename)
benchmark_python_xz_parallel(filename)
# TODO test with multistream xzparallel_xz_decoder.py
Click to expand
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import bisect
import io
import lzma
import math
import multiprocessing.pool
import os
import resource
import sys
import time
from typing import Iterable
import xz
# TODO Add tests for everything
def overrides(parentClass):
"""Simple decorator that checks that a method with the same name exists in the parent class"""
def overrider(method):
assert method.__name__ in dir(parentClass)
assert callable(getattr(parentClass, method.__name__))
return method
return overrider
class LruCache(dict):
def __init__(self, size: int = 10):
self.size = size
self.lastUsed: List[int] = []
def _refresh(self, key):
if key in self.lastUsed:
self.lastUsed.remove(key)
self.lastUsed.append(key)
def __setitem__(self, key, value):
super().__setitem__(key, value)
self._refresh(key)
while super().__len__() > self.size:
super().__delitem__(self.lastUsed.pop(0))
def __getitem__(self, key):
value = super().__getitem__(key)
self._refresh(key)
return value
class Prefetcher:
def __init__(self, memorySize):
self.lastFetched = []
self.memorySize = memorySize
def fetch(self, value):
if value in self.lastFetched:
self.lastFetched.remove(value)
self.lastFetched.append(value)
while len(self.lastFetched) > self.memorySize:
self.lastFetched.pop(0)
def prefetch(self, maximumToPrefetch) -> Iterable:
if not self.lastFetched or maximumToPrefetch <= 0:
return []
consecutiveCount = 0
values = self.lastFetched[::-1]
for i, j in zip(values[0:-1], values[1:]):
if i == j + 1:
consecutiveCount += 1
else:
break
# I want an exponential progression like: logStep**consecutiveCount with the boundary conditions:
# logStep**0 = 1 (mathematically true for any logStep because consecutiveCount was chosen to fit)
# logStep**maxConsecutiveCount = maximumToPrefetch
# => logStep = exp(ln(maximumToPrefetch)/maxConsecutiveCount)
# => logStep**consecutiveCount = exp(ln(maximumToPrefetch) * consecutiveCount/maxConsecutiveCount)
prefetchCount = int(round(math.exp(math.log(maximumToPrefetch) * consecutiveCount / (self.memorySize - 1))))
return range(self.lastFetched[-1] + 1, self.lastFetched[-1] + 1 + prefetchCount)
class ParallelXZReader(io.BufferedIOBase):
# TODO test if a simple thread pool would also parallelize equally well
"""Uses a process pool to prefetch and cache decoded xz blocks"""
def __init__(self, filename, parallelization):
print("Parallelize:", parallelization)
self.parallelization = parallelization - 1 # keep one core for on-demand decompression
self.pool = multiprocessing.pool.Pool(self.parallelization)
self.offset = 0
self.filename = filename
self.fileobj = xz.open(filename, 'rb')
self.blockCache = LruCache(2 * parallelization)
self.prefetcher = Prefetcher(4)
assert self.fileobj.seekable() and self.fileobj.readable()
print(self.fileobj.stream_boundaries)
print(self.fileobj.block_boundaries) # contains uncompressed offsets and therefore sizes -> perfect!
def _findBlock(self, offset: int):
blockNumber = bisect.bisect_right(self.fileobj.block_boundaries, offset)
print("Look for offset:", offset, "found:", blockNumber)
if blockNumber <= 0:
return blockNumber - 1, 0, 0
if blockNumber >= len(self.fileobj.block_boundaries) or blockNumber <= 0:
return blockNumber - 1, offset - self.fileobj.block_boundaries[blockNumber - 1], -1
blockSize = self.fileobj.block_boundaries[blockNumber] - self.fileobj.block_boundaries[blockNumber - 1]
offsetInBlock = offset - self.fileobj.block_boundaries[blockNumber - 1]
assert offsetInBlock >= 0
assert offsetInBlock < blockSize
return blockNumber - 1, offsetInBlock, blockSize
def _blockSize(self, blockNumber):
blockNumber += 1
if blockNumber >= len(self.fileobj.block_boundaries) or blockNumber <= 0:
return -1
return self.fileobj.block_boundaries[blockNumber] - self.fileobj.block_boundaries[blockNumber - 1]
@staticmethod
def _decodeBlock(filename, offset, size):
with xz.open(filename, 'rb') as file:
file.seek(offset)
return file.read(size)
def __enter__(self):
return self
def __exit__(self, exception_type, exception_value, exception_traceback):
self.close()
@overrides(io.BufferedIOBase)
def close(self) -> None:
self.fileobj.close()
self.pool.close()
@overrides(io.BufferedIOBase)
def fileno(self) -> int:
# This is a virtual Python level file object and therefore does not have a valid OS file descriptor!
raise io.UnsupportedOperation()
@overrides(io.BufferedIOBase)
def seekable(self) -> bool:
return True
@overrides(io.BufferedIOBase)
def readable(self) -> bool:
return True
@overrides(io.BufferedIOBase)
def writable(self) -> bool:
return False
@overrides(io.BufferedIOBase)
def read(self, size: int = -1) -> bytes:
print("\nread", size, "from", self.offset)
result = bytes()
blocks = []
blockNumber, firstBlockOffset, blockSize = self._findBlock(self.offset)
print("Found block:", blockNumber, blockSize, firstBlockOffset)
if blockNumber >= len(self.fileobj.block_boundaries) or blockNumber < 0:
return result
pendingBlocks = sum(not block.ready() for block in self.blockCache.values())
availableSize = blockSize - firstBlockOffset
while True:
# Fetch Block
self.prefetcher.fetch(blockNumber)
if blockNumber in self.blockCache:
fetchedBlock = self.blockCache[blockNumber]
else:
print("fetch block:", blockNumber, "sized", self._blockSize(blockNumber))
fetchedBlock = self.pool.apply_async(
ParallelXZReader._decodeBlock,
(self.filename, self.fileobj.block_boundaries[blockNumber], self._blockSize(blockNumber)),
)
self.blockCache[blockNumber] = fetchedBlock
pendingBlocks += 1
blocks.append(fetchedBlock)
if size <= availableSize or blockSize == -1:
break
size -= availableSize
self.offset += availableSize
# Get metadata for next block
blockNumber += 1
if blockNumber >= len(self.fileobj.block_boundaries):
break
blockSize = self._blockSize(blockNumber)
offsetInBlock = self.offset - self.fileobj.block_boundaries[blockNumber - 1]
availableSize = blockSize - offsetInBlock
# TODO apply prefetch suggestion
maxToPrefetch = self.parallelization - pendingBlocks
toPrefetch = self.prefetcher.prefetch(self.parallelization)
print("Prefetch suggestion:", toPrefetch)
for blockNumber in toPrefetch:
if blockNumber < len(self.fileobj.block_boundaries) and blockNumber not in self.blockCache:
fetchedBlock = self.pool.apply_async(
ParallelXZReader._decodeBlock,
(self.filename, self.fileobj.block_boundaries[blockNumber], self._blockSize(blockNumber)),
)
self.blockCache[blockNumber] = fetchedBlock
pendingBlocks += 1
print("pending blocks:", pendingBlocks)
print("Got blocks:", blocks)
while blocks:
block = blocks.pop(0)
# Note that it is perfectly safe to call AsyncResult.get multiple times!
toAppend = block.get()
print(f"Append view ({firstBlockOffset},{ size}) of block of length {len(toAppend)}")
if firstBlockOffset > 0:
toAppend = toAppend[firstBlockOffset:]
if not blocks:
toAppend = toAppend[:size]
firstBlockOffset = 0
result += toAppend
if blockNumber == 21:
print("Result:", len(result))
# TODO fall back to reading directly from fileobj if prefetch suggests nothing at all to improve latency!
# self.fileobj.seek(self.offset)
# result = self.fileobj.read(size)
self.offset += len(result)
return result
@overrides(io.BufferedIOBase)
def seek(self, offset: int, whence: int = io.SEEK_SET) -> int:
if whence == io.SEEK_CUR:
self.offset += offset
elif whence == io.SEEK_END:
self.offset = self.cumsizes[-1] + offset
elif whence == io.SEEK_SET:
self.offset = offset
if self.offset < 0:
raise ValueError("Trying to seek before the start of the file!")
if self.offset >= self.cumsizes[-1]:
return self.offset
return self.offset
@overrides(io.BufferedIOBase)
def tell(self) -> int:
return self.offsetManual Shell Execution
base64 /dev/urandom | head -c $(( 4*1024*1024*1024 )) > large
xz -T 0 --keep large
python3 decompress-xz-parallel.py large.xz