Skip to content
89 changes: 61 additions & 28 deletions hca/dss/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from datetime import datetime
from fnmatch import fnmatchcase
import hashlib
import shutil
import os
import re
import tempfile
Expand Down Expand Up @@ -202,7 +203,8 @@ def download(self,
no_metadata=False,
no_data=False,
num_retries=10,
min_delay_seconds=0.25):
min_delay_seconds=0.25,
delete_cache=False):
"""
Download a bundle and save it to the local filesystem as a directory.

Expand All @@ -225,6 +227,16 @@ def download(self,
:param int num_retries: The initial quota of download failures to accept before exiting due to failures.
The number of retries increase and decrease as file chucks succeed and fail.
:param float min_delay_seconds: The minimum number of seconds to wait in between retries.
:param bool delete_cache: When downloading files, the folder '.hca' contains additional file references
(hard links) that serve as a cache when downloading. Specifying this option will
delete that cache after the files are downloaded.

Note that deleting the cache (the folder '.hca') prevents efficient switching of
the download layout (see --layout), especially when rerunning the download with
additional rows in the manifest. Once you delete the cache, the next invocation
will download all files again fresh. Also note that the cache does not take up
significant disk space. Each file is be stored exactly once, the cache
contains only references (hard links) to the downloaded files.

Download a bundle and save it to the local filesystem as a directory.
By default, all data and metadata files are downloaded. To disable the downloading of data files,
Expand Down Expand Up @@ -266,6 +278,9 @@ def download(self,
if errors:
raise RuntimeError('{} file(s) failed to download'.format(errors))

if delete_cache:
shutil.rmtree(self._cache_path(download_dir))

def download_manifest(self,
manifest,
replica,
Expand All @@ -274,32 +289,43 @@ def download_manifest(self,
no_data=False,
num_retries=10,
min_delay_seconds=0.25,
download_dir=''):
download_dir='',
delete_cache=False):
"""
Process the given manifest file in TSV (tab-separated values) format and download the files referenced by it.

:param str layout: The layout of the downloaded files. Currently two options are supported, 'none' (the
default), and 'bundle'.
:param str manifest: The path to a TSV (tab-separated values) file listing files to download. If the directory
for download already contains the manifest, the manifest will be overwritten to include a column with paths
into the filestore.
into the cache.
:param str replica: The replica from which to download. The supported replicas are: `aws` for Amazon Web
Services, and `gcp` for Google Cloud Platform. [aws, gcp]
:param int num_retries: The initial quota of download failures to accept before exiting due to
failures. The number of retries increase and decrease as file chucks succeed and fail.
:param float min_delay_seconds: The minimum number of seconds to wait in between retries for downloading any
file
:param str download_dir: The directory into which to download

Files are always downloaded to a cache / filestore directory called '.hca'. This directory is created in the
:param bool delete_cache: When downloading files, the folder '.hca' contains additional file references
(hard links) that serve as a cache when downloading. Specifying this option will
delete that cache after the files are downloaded.

Note that deleting the cache (the folder '.hca') prevents efficient switching of
the download layout (see --layout), especially when rerunning the download with
additional rows in the manifest. Once you delete the cache, the next invocation
will download all files again fresh. Also note that the cache does not take up
significant disk space. Each file is be stored exactly once, the cache
contains only references (hard links) to the downloaded files.

Files are always downloaded to a cache / cache directory called '.hca'. This directory is created in the
current directory where download is initiated. A copy of the manifest used is also written to the current
directory. This manifest has an added column that lists the paths of the files within the '.hca' filestore.
directory. This manifest has an added column that lists the paths of the files within the '.hca' cache.
.
The default layout is **none**. In this layout all of the files are downloaded to the filestore and the
The default layout is **none**. In this layout all of the files are downloaded to the cache and the
recommended way of accessing the files in by parsing the manifest copy that's written to the download
directory.
.
The bundle layout still downloads all of files to the filestore. For each bundle mentioned in the
The bundle layout still downloads all of files to the cache. For each bundle mentioned in the
manifest a directory is created. All relevant metadata files for each bundle are linked into these
directories in addition to relevant data files mentioned in the manifest.
.
Expand All @@ -323,7 +349,7 @@ def download_manifest(self,
if layout == 'none':
if no_metadata or no_data:
raise ValueError("--no-metadata and --no-data are only compatible with the 'bundle' layout")
self._download_manifest_filestore(manifest, replica, num_retries, min_delay_seconds, download_dir)
self._download_manifest_cache(manifest, replica, num_retries, min_delay_seconds, download_dir)
elif layout == 'bundle':
self._download_manifest_bundle(manifest,
replica,
Expand All @@ -335,20 +361,23 @@ def download_manifest(self,
else:
raise ValueError('Invalid layout {} not one of [none, bundle]'.format(layout))

def _download_manifest_filestore(self,
manifest,
replica,
num_retries,
min_delay_seconds,
download_dir):
if delete_cache:
shutil.rmtree(self._cache_path(download_dir))

def _download_manifest_cache(self,
manifest,
replica,
num_retries,
min_delay_seconds,
download_dir):
fieldnames, rows = self._parse_manifest(manifest)
errors = 0

with concurrent.futures.ThreadPoolExecutor(max_workers=self.threads) as executor:
futures_to_dss_file = {}
for row in rows:
dss_file = DSSFile.from_manifest_row(row, replica)
future = executor.submit(self._download_to_filestore, download_dir, dss_file,
future = executor.submit(self._download_to_cache, download_dir, dss_file,
num_retries=num_retries, min_delay_seconds=min_delay_seconds)
futures_to_dss_file[future] = dss_file
for future in concurrent.futures.as_completed(futures_to_dss_file):
Expand Down Expand Up @@ -454,7 +483,7 @@ def _bundle_download_tasks(self,

logger.info("File %s: Retrieving...", filename)
file_path = os.path.join(walking_dir, filename_base)
yield dss_file, functools.partial(self._download_and_link_to_filestore,
yield dss_file, functools.partial(self._download_and_link_to_cache,
download_dir,
dss_file,
file_path,
Expand Down Expand Up @@ -538,9 +567,9 @@ def _download_manifest_tasks(self,
num_retries=num_retries,
min_delay_seconds=min_delay_seconds)

def _download_to_filestore(self, download_dir, dss_file, num_retries=10, min_delay_seconds=0.25):
def _download_to_cache(self, download_dir, dss_file, num_retries=10, min_delay_seconds=0.25):
"""
Attempt to download the data and save it in the 'filestore' location dictated by self._file_path()
Attempt to download the data and save it in the 'cache' location dictated by self._file_path()
"""
dest_path = self._file_path(dss_file.sha256, download_dir)
if os.path.exists(dest_path):
Expand All @@ -551,11 +580,11 @@ def _download_to_filestore(self, download_dir, dss_file, num_retries=10, min_del
logger.info("Download '%s' to '%s'.", dss_file.name, dest_path)
return dest_path

def _download_and_link_to_filestore(self, download_dir, dss_file, file_path, num_retries, min_delay_seconds):
file_store_path = self._download_to_filestore(download_dir,
dss_file,
num_retries=num_retries,
min_delay_seconds=min_delay_seconds)
def _download_and_link_to_cache(self, download_dir, dss_file, file_path, num_retries, min_delay_seconds):
file_store_path = self._download_to_cache(download_dir,
dss_file,
num_retries=num_retries,
min_delay_seconds=min_delay_seconds)
self._make_path_if_necessary(file_path)
hardlink(file_store_path, file_path)

Expand Down Expand Up @@ -656,17 +685,21 @@ def _do_download_file(self, dss_file, fh, num_retries, min_delay_seconds):
raise
return hasher.hexdigest()

@classmethod
def _cache_path(cls, download_dir):
return os.path.join(download_dir, '.hca', 'v2')

@classmethod
def _file_path(cls, checksum, download_dir):
"""
returns a file's relative local path based on the nesting parameters and the files hash
:param checksum: a string checksum
:param download_dir: root directory for filestore
:param download_dir: root directory for cache
:return: relative Path object
"""
checksum = checksum.lower()
file_prefix = '_'.join(['files'] + list(map(str, cls.DIRECTORY_NAME_LENGTHS)))
path_pieces = [download_dir, '.hca', 'v2', file_prefix]
path_pieces = [cls._cache_path(download_dir), file_prefix]
checksum_index = 0
assert(sum(cls.DIRECTORY_NAME_LENGTHS) <= len(checksum))
for prefix_length in cls.DIRECTORY_NAME_LENGTHS:
Expand All @@ -675,7 +708,7 @@ def _file_path(cls, checksum, download_dir):
path_pieces.append(checksum)
return os.path.join(*path_pieces)

def _write_output_manifest(self, manifest, filestore_root):
def _write_output_manifest(self, manifest, cache_root):
"""
Adds the file path column to the manifest and writes the copy to the current directory. If the original manifest
is in the current directory it is overwritten with a warning.
Expand All @@ -689,7 +722,7 @@ def _write_output_manifest(self, manifest, filestore_root):
writer = csv.DictWriter(f, fieldnames, delimiter=delimiter, quoting=csv.QUOTE_NONE)
writer.writeheader()
for row in source_manifest:
row['file_path'] = self._file_path(row['file_sha256'], filestore_root)
row['file_path'] = self._file_path(row['file_sha256'], cache_root)
writer.writerow(row)
if os.path.isfile(output):
logger.warning('Overwriting manifest %s', output)
Expand Down
12 changes: 6 additions & 6 deletions test/unit/test_dss_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,10 @@ def test_file_path(self):
self.assertEqual(parts, ['.', '.hca', 'v2', 'files_1_3_2', 'a', 'bcd', 'ef', 'abcdefghij'])

@patch('hca.dss.DSSClient.DIRECTORY_NAME_LENGTHS', [1, 3, 2])
def test_file_path_filestore_root(self):
self.assertRaises(AssertionError, self.dss._file_path, 'a', 'nested_filestore')
parts = self.dss._file_path('abcdefghij', 'nested_filestore').split(os.sep)
self.assertEqual(parts, ['nested_filestore', '.hca', 'v2', 'files_1_3_2', 'a', 'bcd', 'ef', 'abcdefghij'])
def test_file_path_cache_root(self):
self.assertRaises(AssertionError, self.dss._file_path, 'a', 'nested_cache')
parts = self.dss._file_path('abcdefghij', 'nested_cache').split(os.sep)
self.assertEqual(parts, ['nested_cache', '.hca', 'v2', 'files_1_3_2', 'a', 'bcd', 'ef', 'abcdefghij'])

@unittest.skipIf(os.name is 'nt', 'Unable to test on Windows') # TODO windows testing refactor
@patch('logging.Logger.warning')
Expand Down Expand Up @@ -314,10 +314,10 @@ def _assert_links(self, prefix):
if sys.version_info >= (3,) or platform.system() != 'Windows':
for linked_file in self.data_files(prefix=prefix):
self.assertEqual(os.stat(linked_file).st_nlink, 2,
'Expected one link for the "filestore" entry and link in bundle download')
'Expected one link for the "cache" entry and link in bundle download')
for linked_file in self.metadata_files(prefix=prefix):
self.assertEqual(os.stat(linked_file).st_nlink, 4,
'Expected one link for the "filestore" entry and one for each bundle')
'Expected one link for the "cache" entry and one for each bundle')

def _assert_all_files_downloaded(self, more_files=None, prefix=''):
bundle_files = self.data_files(prefix=prefix).union(self.metadata_files(prefix=prefix))
Expand Down