Skip to content

Commit 318b8a0

Browse files
authored
Merge pull request #34 from rok4/feature/stockage_HTTP
Ajout du stockage HTTP dans Storage
2 parents 685b463 + 7ec2179 commit 318b8a0

File tree

3 files changed

+223
-16
lines changed

3 files changed

+223
-16
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
## Summary
22

3+
## Changelog
4+
35
<!--
46
### [Added]
57

src/rok4/Storage.py

Lines changed: 92 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22
33
Available storage types are :
44
- S3 (path are preffixed with `s3://`)
5-
- CEPH (path are preffixed with `ceph://`)
6-
- FILE (path are preffixed with `file://`, but it is the default paths' interpretation)
5+
- CEPH (path are prefixed with `ceph://`)
6+
- FILE (path are prefixed with `file://`, but it is the default paths' interpretation)
7+
- HTTP (path are prefixed with `http://`)
8+
- HTTPS (path are prefixed with `https://`)
79
810
According to functions, all storage types are not necessarily available.
911
@@ -35,6 +37,7 @@
3537
import os
3638
import rados
3739
import hashlib
40+
import requests
3841
from typing import Dict, List, Tuple, Union
3942
from enum import Enum
4043
from shutil import copyfile
@@ -49,6 +52,8 @@ class StorageType(Enum):
4952
FILE = "file://"
5053
S3 = "s3://"
5154
CEPH = "ceph://"
55+
HTTP = "http://"
56+
HTTPS = "https://"
5257

5358

5459
__S3_CLIENTS = {}
@@ -70,6 +75,7 @@ def __get_s3_client(bucket_name: str) -> Tuple[Dict[str, Union["boto3.client", s
7075
Returns:
7176
Tuple[Dict[str, Union['boto3.client',str]], str, str]: the S3 informations (client, host, key, secret) and the simple bucket name
7277
"""
78+
7379
global __S3_CLIENTS, __S3_DEFAULT_CLIENT
7480

7581
if not __S3_CLIENTS:
@@ -181,7 +187,6 @@ def __get_ceph_ioctx(pool: str) -> "rados.Ioctx":
181187

182188
def disconnect_ceph_clients() -> None:
183189
"""Clean CEPH clients"""
184-
185190
global __CEPH_CLIENT, __CEPH_IOCTXS
186191
__CEPH_CLIENT = None
187192
__CEPH_IOCTXS = {}
@@ -213,6 +218,10 @@ def get_infos_from_path(path: str) -> Tuple[StorageType, str, str, str]:
213218
return StorageType.CEPH, path[7:], pool_name, object_name
214219
elif path.startswith("file://"):
215220
return StorageType.FILE, path[7:], os.path.dirname(path[7:]), os.path.basename(path[7:])
221+
elif path.startswith("http://"):
222+
return StorageType.HTTP, path[7:], os.path.dirname(path[7:]), os.path.basename(path[7:])
223+
elif path.startswith("https://"):
224+
return StorageType.HTTPS, path[8:], os.path.dirname(path[8:]), os.path.basename(path[8:])
216225
else:
217226
return StorageType.FILE, path, os.path.dirname(path), os.path.basename(path)
218227

@@ -285,7 +294,6 @@ def get_data_binary(path: str, range: Tuple[int, int] = None) -> str:
285294
Returns:
286295
str: Data binary content
287296
"""
288-
289297
storage_type, path, tray_name, base_name = get_infos_from_path(path)
290298

291299
if storage_type == StorageType.S3:
@@ -354,6 +362,19 @@ def get_data_binary(path: str, range: Tuple[int, int] = None) -> str:
354362
except Exception as e:
355363
raise StorageError("FILE", e)
356364

365+
elif storage_type == StorageType.HTTP or storage_type == StorageType.HTTPS:
366+
367+
if range is None :
368+
try:
369+
reponse = requests.get(f"{storage_type.value}{path}", stream=True)
370+
data = reponse.content
371+
if reponse.status_code == 404 :
372+
raise FileNotFoundError(f"{storage_type.value}{path}")
373+
except Exception as e:
374+
raise StorageError(storage_type.name, e)
375+
else :
376+
raise NotImplementedError
377+
357378
else:
358379
raise StorageError("UNKNOWN", "Unhandled storage type to read binary data")
359380

@@ -449,6 +470,15 @@ def get_size(path: str) -> int:
449470
except Exception as e:
450471
raise StorageError("FILE", e)
451472

473+
elif storage_type == StorageType.HTTP or storage_type == StorageType.HTTPS:
474+
475+
try:
476+
# Le stream=True permet de ne télécharger que le header initialement
477+
reponse = requests.get(storage_type.value + path, stream=True).headers["content-length"]
478+
return reponse
479+
except Exception as e:
480+
raise StorageError(storage_type.name, e)
481+
452482
else:
453483
raise StorageError("UNKNOWN", "Unhandled storage type to get size")
454484

@@ -495,6 +525,17 @@ def exists(path: str) -> bool:
495525
elif storage_type == StorageType.FILE:
496526
return os.path.exists(path)
497527

528+
elif storage_type == StorageType.HTTP or storage_type == StorageType.HTTPS:
529+
530+
try:
531+
response = requests.get(storage_type.value + path, stream=True)
532+
if response.status_code == 200 :
533+
return True
534+
else :
535+
return False
536+
except Exception as e:
537+
raise StorageError(storage_type.name, e)
538+
498539
else:
499540
raise StorageError("UNKNOWN", "Unhandled storage type to test if exists")
500541

@@ -798,6 +839,53 @@ def copy(from_path: str, to_path: str, from_md5: str = None) -> None:
798839
f"CEPH and S3", f"Cannot copy CEPH object {from_path} to S3 object {to_path} : {e}"
799840
)
800841

842+
elif (from_type == StorageType.HTTP or from_type == StorageType.HTTPS) and to_type == StorageType.FILE :
843+
844+
try:
845+
response = requests.get(from_type.value + from_path, stream = True)
846+
with open(to_path, "wb") as f:
847+
for chunk in response.iter_content(chunk_size=65536) :
848+
if chunk:
849+
f.write(chunk)
850+
851+
except Exception as e:
852+
raise StorageError(f"HTTP(S) and FILE", f"Cannot copy HTTP(S) object {from_path} to FILE object {to_path} : {e}")
853+
854+
elif (from_type == StorageType.HTTP or from_type == StorageType.HTTPS) and to_type == StorageType.CEPH :
855+
856+
to_ioctx = __get_ceph_ioctx(to_tray)
857+
858+
try:
859+
response = requests.get(from_type.value + from_path, stream = True)
860+
offset = 0
861+
for chunk in response.iter_content(chunk_size=65536) :
862+
if chunk:
863+
size = len(chunk)
864+
to_ioctx.write(to_base_name, chunk, offset)
865+
offset += size
866+
867+
except Exception as e:
868+
raise StorageError(f"HTTP(S) and CEPH", f"Cannot copy HTTP(S) object {from_path} to CEPH object {to_path} : {e}")
869+
870+
elif (from_type == StorageType.HTTP or from_type == StorageType.HTTPS) and to_type == StorageType.S3 :
871+
872+
to_s3_client, to_bucket = __get_s3_client(to_tray)
873+
874+
try:
875+
response = requests.get(from_type.value + from_path, stream = True)
876+
with tempfile.NamedTemporaryFile("w+b",delete=False) as f:
877+
name_fich = f.name
878+
for chunk in response.iter_content(chunk_size=65536) :
879+
if chunk:
880+
f.write(chunk)
881+
882+
to_s3_client["client"].upload_file(name_fich, to_tray, to_base_name)
883+
884+
os.remove(name_fich)
885+
886+
except Exception as e:
887+
raise StorageError(f"HTTP(S) and S3", f"Cannot copy HTTP(S) object {from_path} to S3 object {to_path} : {e}")
888+
801889
else:
802890
raise StorageError(
803891
f"{from_type.name} and {to_type.name}",

0 commit comments

Comments
 (0)