Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/DIRAC/Resources/Catalog/FileCatalogClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ def getReplicas(self, lfns, allStatus=False, timeout=120):
successful = {}
failed = {}

for chunk in breakListIntoChunks(lfns, GET_REPLICAS_CHUNK_SIZE):
# We want to sort the lfns because of the way the server groups the db queries by
# directory. So if we sort them, the grouping is more efficient.
for chunk in breakListIntoChunks(sorted(lfns), GET_REPLICAS_CHUNK_SIZE):
rpcClient = self._getRPC(timeout=timeout)
result = rpcClient.getReplicas(chunk, allStatus)

Expand Down
6 changes: 3 additions & 3 deletions src/DIRAC/Resources/Catalog/Utilities.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
""" DIRAC FileCatalog client utilities
"""
"""DIRAC FileCatalog client utilities"""

import os
import errno
import functools
Expand All @@ -16,7 +16,7 @@ def checkArgumentDict(path):
"""Check and process format of the arguments to FileCatalog methods"""
if isinstance(path, str):
urls = {path: True}
elif isinstance(path, list):
elif isinstance(path, (list, set)):
urls = {}
for url in path:
urls[url] = True
Expand Down
25 changes: 12 additions & 13 deletions src/DIRAC/TransformationSystem/Agent/TransformationAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,19 +500,18 @@ def __getDataReplicas(self, transDict, lfns, clients, forJobs=True):
startTime = time.time()
self._logInfo(f"Getting replicas for {len(newLFNs)} files from catalog", method=method, transID=transID)
newReplicas = {}
for chunk in breakListIntoChunks(newLFNs, 10000):
res = self._getDataReplicasDM(transID, chunk, clients, forJobs=forJobs)
if res["OK"]:
reps = {lfn: ses for lfn, ses in res["Value"].items() if ses}
newReplicas.update(reps)
self.__updateCache(transID, reps)
else:
self._logWarn(
f"Failed to get replicas for {len(chunk)} files",
res["Message"],
method=method,
transID=transID,
)
res = self._getDataReplicasDM(transID, newLFNs, clients, forJobs=forJobs)
if res["OK"]:
newReplicas = {lfn: ses for lfn, ses in res["Value"].items() if ses}

self.__updateCache(transID, newReplicas)
else:
self._logWarn(
f"Failed to get replicas for {len(newLFNs)} files",
res["Message"],
method=method,
transID=transID,
)

self._logInfo(
f"Obtained {len(newReplicas)} replicas from catalog in {time.time() - startTime:.1f} seconds",
Expand Down
Loading