diff --git a/src/analyzers/aes_cbc_analyzer.py b/src/analyzers/aes_cbc_analyzer.py index 531f4f3..8326c99 100644 --- a/src/analyzers/aes_cbc_analyzer.py +++ b/src/analyzers/aes_cbc_analyzer.py @@ -26,11 +26,13 @@ class Aes_Cbc_Analyzer(CryptoAnalyzer): def identifier_algo(self, chemin_fichier_chiffre: str) -> float: ''' - Détermine la probabilité que l'algo de chiffrement utilisé soit l'aes cbc en: + Estime la probabilité que le fichier soit chiffré en AES-CBC. - - recherchant l'IV en tête - - vérifiant si le reste du fichier en dehors de l'IV a une taille multiple de 16 octets - - déterminant si l'entropie est assez élevée dans le fichier chiffré (>7.5) + Principes: + - IV attendu en début de fichier (16 octets), suivi des données chiffrées. + - En CBC, le corps (hors IV) est multiple de 16 octets (padding par blocs). + - L'entropie élevée est un indicateur secondaire (faible poids). + - On pénalise un motif typique AES-GCM (nonce 12B + tag 16B + corps non multiple de 16). Args: chemin_fichier_chiffre(str): Le chemin du fichier chiffré à traiter (mission1.enc). @@ -43,6 +45,7 @@ def identifier_algo(self, chemin_fichier_chiffre: str) -> float: with open(chemin_fichier_chiffre, "rb") as f: contenu_fichier = f.read() + # Garde simple: impossible d'avoir IV (16B) si le fichier est trop court if len(contenu_fichier) < 16: return 0.0 @@ -51,18 +54,19 @@ def identifier_algo(self, chemin_fichier_chiffre: str) -> float: score: float = 0.0 - # CBC: taille des données chiffrées multiple de 16 + # CBC: le corps doit être multiple de 16 (car padding par blocs de 16) if len(corps) % 16 == 0 and len(corps) > 0: score += 0.55 else: score -= 0.25 - # Entropie globale des données + # Entropie globale des données (indicateur secondaire, on ajoute un bonus léger) ent = calculer_entropie(corps) if ent > 7.3: score += 0.35 - # Négatifs structure GCM: nonce 12B au début + tag 16B à la fin et corps non-multiple de 16 + # Négatif contre GCM: motif nonce 12B au début + tag 16B à la fin + corps non multiple de 16 + # Si ce motif est détecté, cela contredit CBC → forte pénalité if len(contenu_fichier) >= 28: from src.utils import calculer_entropie as entf nonce12 = contenu_fichier[:12] @@ -73,7 +77,7 @@ def identifier_algo(self, chemin_fichier_chiffre: str) -> float: ): score -= 0.60 - # Clamp [0,1] + # Normalisation: on borne toujours le score dans [0, 1] if score < 0.0: score = 0.0 if score > 1.0: diff --git a/src/analyzers/aes_gcm_analyzer.py b/src/analyzers/aes_gcm_analyzer.py index f9b8a77..872a392 100644 --- a/src/analyzers/aes_gcm_analyzer.py +++ b/src/analyzers/aes_gcm_analyzer.py @@ -6,226 +6,230 @@ import re class Aes_Gcm_Analyzer(CryptoAnalyzer): - '''Détermine si l'algo aes_gcm est utilisé, génère des clés et tente de de déchffrer un fichier chiffré en utilisant les clés générées. - - Cette classe a trois méthodes principales: - - identifier_algo: Détermine si l'algo de chiffrement utilsé sur le fichier chiffré qui lui est passé en paramètre est l'aes_gcm. - - generer_cles_candidates: Génère une liste de clés candidates pour le déchiffrement du fichier chiffré - - dechiffrer: fait le déchiffrement proprement dit sur la base de la liste des clés générées + '''Détermine si l'algo aes_gcm est utilisé, génère des clés et tente de de déchffrer un fichier chiffré en utilisant les clés générées. - Attributes: - _PBKDF2_SALT: le salt utilisé pour le chiffrement - _PBKDF2_ITERATIONS: le nombre d'itérations faites au chiffrement - _PBKDF2_LONGUEUR_CLE: la longueur en octets de la clé à utiliser - - ''' - - _PBKDF2_SALT: bytes = b"AES_GCM_SALT_2024" #Fourni - _PBKDF2_ITERATIONS: int = 10000 #Fourni - _PBKDF2_LONGUEUR_CLE: int = 32 #Longueur de la clé - - def __filtrer_dictionnaire_par_indices(self, chemin_dictionnaire: str) -> List[str]: - """ - Filtre le dictionnaire en se basant sur les indices de la mission 4. - L'indice pointe vers le format de clé "Acronyme en majuscules + 4 chiffres". - """ - mots_filtres: List[str] = [] - annee_courante: str = "2024" # Normalement 2025 mais on considère 2024 pour se conformer à la wordlist - motif_acronyme = re.compile(r"^[A-Z]{4}$") - - try: - with open(chemin_dictionnaire, "r", encoding="utf-8") as f: - for ligne in f: - mot: str = ligne.strip() - if mot.endswith(annee_courante): - acronyme: str = mot[:-4] - if motif_acronyme.match(acronyme): - mots_filtres.append(mot) - except FileNotFoundError: - print(f"Erreur : Le fichier de dictionnaire '{chemin_dictionnaire}' est introuvable.") - return [] - - return mots_filtres - - - - def generer_cles_candidates(self, chemin_dictionnaire: str) -> List[bytes]: - ''' - Génère les clées candidates pour déchiffrer le fichier à partir de la liste retournée par filtrer_dictionnaire_par_indices. - - Args: - chemin_dictionnaire(str): le chemin du dictionnaire de mots de passes pour l'attaque par dictionnaire. + Cette classe a trois méthodes principales: + - identifier_algo: Détermine si l'algo de chiffrement utilsé sur le fichier chiffré qui lui est passé en paramètre est l'aes_gcm. + - generer_cles_candidates: Génère une liste de clés candidates pour le déchiffrement du fichier chiffré + - dechiffrer: fait le déchiffrement proprement dit sur la base de la liste des clés générées - Returns: - list[bytes]: liste des clés candidates. + Attributes: + _PBKDF2_SALT: le salt utilisé pour le chiffrement + _PBKDF2_ITERATIONS: le nombre d'itérations faites au chiffrement + _PBKDF2_LONGUEUR_CLE: la longueur en octets de la clé à utiliser ''' - mots_de_passe_cible: List[str] = self.__filtrer_dictionnaire_par_indices(chemin_dictionnaire) - - clees_candidates: List[bytes] = [] - - for mot_de_passe in mots_de_passe_cible: - kdf = PBKDF2HMAC( - algorithm=hashes.SHA256(), - length=self._PBKDF2_LONGUEUR_CLE, - iterations=self._PBKDF2_ITERATIONS, - salt=self._PBKDF2_SALT - ) - mot_de_passe_en_octets: bytes = mot_de_passe.encode('utf-8') - cle_derivee: bytes = kdf.derive(mot_de_passe_en_octets) - clees_candidates.append(cle_derivee) - - return clees_candidates - - def identifier_algo(self, chemin_fichier_chiffre: str) -> float: - """ - Identifie si le fichier utilise l'algorithme AES GCM. + _PBKDF2_SALT: bytes = b"AES_GCM_SALT_2024" #Fourni + _PBKDF2_ITERATIONS: int = 10000 #Fourni + _PBKDF2_LONGUEUR_CLE: int = 32 #Longueur de la clé - Cette méthode utilise plusieurs heuristiques spécifiques à AES GCM pour se différencier d'AES CBC : - - Structure attendue: nonce (12 bytes) + données chiffrées + tag (16 bytes) - - Pas de padding (donc taille du corps non contrainte par 16) - - Poids fort donné aux vérifications structurelles, poids faible à l'entropie - - Args: - chemin_fichier_chiffre(str): Le chemin vers le fichier chiffré. + def __filtrer_dictionnaire_par_indices(self, chemin_dictionnaire: str) -> List[str]: + """ + Filtre le dictionnaire en se basant sur les indices de la mission 4. + L'indice pointe vers le format de clé "Acronyme en majuscules + 4 chiffres". + """ + mots_filtres: List[str] = [] + annee_courante: str = "2024" # Normalement 2025 mais on considère 2024 pour se conformer à la wordlist + motif_acronyme = re.compile(r"^[A-Z]{4}$") + + try: + with open(chemin_dictionnaire, "r", encoding="utf-8") as f: + for ligne in f: + mot: str = ligne.strip() + if mot.endswith(annee_courante): + acronyme: str = mot[:-4] + if motif_acronyme.match(acronyme): + mots_filtres.append(mot) + except FileNotFoundError: + print(f"Erreur : Le fichier de dictionnaire '{chemin_dictionnaire}' est introuvable.") + return [] + + return mots_filtres + + def generer_cles_candidates(self, chemin_dictionnaire: str) -> List[bytes]: + ''' + Génère les clées candidates pour déchiffrer le fichier à partir de la liste retournée par filtrer_dictionnaire_par_indices. - Returns: - float: Probabilité que le fichier utilise AES GCM (0.0 à 1.0). - """ - try: - with open(chemin_fichier_chiffre, "rb") as f: - contenu_fichier: bytes = f.read() - - # Gate 1: taille minimale (nonce 12 + tag 16 + au moins 1 octet de corps) - if len(contenu_fichier) < 12 + 1 + 16: - return 0.0 + Args: + chemin_dictionnaire(str): le chemin du dictionnaire de mots de passes pour l'attaque par dictionnaire. + + Returns: + list[bytes]: liste des clés candidates. + ''' + + mots_de_passe_cible: List[str] = self.__filtrer_dictionnaire_par_indices(chemin_dictionnaire) + + clees_candidates: List[bytes] = [] + + for mot_de_passe in mots_de_passe_cible: + kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), + length=self._PBKDF2_LONGUEUR_CLE, + iterations=self._PBKDF2_ITERATIONS, + salt=self._PBKDF2_SALT + ) + mot_de_passe_en_octets: bytes = mot_de_passe.encode('utf-8') + cle_derivee: bytes = kdf.derive(mot_de_passe_en_octets) + clees_candidates.append(cle_derivee) + + return clees_candidates + + def identifier_algo(self, chemin_fichier_chiffre: str) -> float: + """ + Estime la probabilité que le fichier soit chiffré en AES-GCM. + + Idée générale: + - Motif structurel attendu: nonce (12 octets) en tête, corps de données chiffrées, puis tag (16 octets) en fin. + - Pas de padding bloc: la taille du corps n'est pas forcément multiple de 16. + - Les vérifications structurelles ont un poids fort. L'entropie apporte seulement des signaux faibles. + + Args: + chemin_fichier_chiffre(str): Le chemin vers le fichier chiffré. + + Returns: + float: Probabilité que le fichier utilise AES GCM (0.0 à 1.0). + """ + try: + with open(chemin_fichier_chiffre, "rb") as f: + contenu_fichier: bytes = f.read() - # Placement attendu: [0:12]=nonce, [-16:]=tag, [12:-16]=corps - nonce: bytes = contenu_fichier[:12] - tag: bytes = contenu_fichier[-16:] - corps: bytes = contenu_fichier[12:-16] + # Garde 1: taille minimale (nonce 12 + tag 16 + au moins 1 octet de corps) + if len(contenu_fichier) < 12 + 1 + 16: + return 0.0 - # Gate 2: tailles strictes - if len(nonce) != 12 or len(tag) != 16 or len(corps) <= 0: - return 0.0 + # Placement attendu: [0:12] = nonce, [-16:] = tag, [12:-16] = corps + nonce: bytes = contenu_fichier[:12] + tag: bytes = contenu_fichier[-16:] + corps: bytes = contenu_fichier[12:-16] + + # Garde 2: tailles strictes + if len(nonce) != 12 or len(tag) != 16 or len(corps) <= 0: + return 0.0 - from src.utils import calculer_entropie - score: float = 0.0 - - # Structure GCM forte (poids majeur) - # Corps non multiple de 16 (pas de padding bloc) - if len(corps) % 16 != 0: - score += 0.35 - else: - # Bloc-mode typique → forte pénalité - score -= 0.40 - - # Taille totale multiple de 16 → peu probable pour GCM (AES/Blowfish like) - if len(contenu_fichier) % 16 == 0: - score -= 0.35 - - # Négatifs forts supplémentaires: motifs modes à blocs - # - IV 16B plausible en tête + corps multiple de 16 (AES-CBC) - if len(contenu_fichier) >= 16: - iv16 = contenu_fichier[:16] - corps16 = contenu_fichier[16:] + from src.utils import calculer_entropie + score: float = 0.0 + + # Signal positif fort (structure GCM): corps non multiple de 16 → pas de padding bloc + if len(corps) % 16 != 0: + score += 0.50 + else: + # Signal négatif (mode bloc typique) : pénalité renforcée + score -= 0.50 + + # Taille totale multiple de 16 : peu probable pour GCM (plus proche AES/Blowfish) + if len(contenu_fichier) % 16 == 0: + score -= 0.40 + + # Autres signaux négatifs "mode bloc": + # - IV 16B plausible en tête + corps multiple de 16 (plutôt AES-CBC) + if len(contenu_fichier) >= 16: + iv16 = contenu_fichier[:16] + corps16 = contenu_fichier[16:] + try: + if len(corps16) > 0 and (len(corps16) % 16) == 0 and calculer_entropie(iv16) > 7.0: + score -= 0.30 + except Exception: + pass + # - IV 8B plausible en tête + corps multiple de 8 (plutôt Blowfish) + if len(contenu_fichier) >= 8: + iv8 = contenu_fichier[:8] + corps8 = contenu_fichier[8:] + if len(corps8) > 0 and (len(corps8) % 8) == 0: + score -= 0.25 + + # Si les 16 derniers octets ne ressemblent PAS à un tag AEAD (faible entropie), on pénalise. + queue16 = contenu_fichier[-16:] if len(contenu_fichier) >= 16 else b"" try: - if len(corps16) > 0 and (len(corps16) % 16) == 0 and calculer_entropie(iv16) > 7.0: + if queue16 and calculer_entropie(queue16) <= 7.0: score -= 0.30 except Exception: pass - # - IV 8B plausible en tête + corps multiple de 8 (Blowfish) - if len(contenu_fichier) >= 8: - iv8 = contenu_fichier[:8] - corps8 = contenu_fichier[8:] - if len(corps8) > 0 and (len(corps8) % 8) == 0: - score -= 0.25 - - # Si les 16 derniers octets ne ressemblent PAS à un tag AEAD (faible entropie), pénaliser - queue16 = contenu_fichier[-16:] if len(contenu_fichier) >= 16 else b"" - try: - if queue16 and calculer_entropie(queue16) <= 7.0: - score -= 0.25 - except Exception: - pass - - # Entropie: signaux faibles (ne doivent pas rendre positif seuls) - if calculer_entropie(tag) > 7.2: - score += 0.10 - if len(corps) > 0 and calculer_entropie(corps) > 7.0: - score += 0.10 - # Nonce aléatoire plausible (faible poids) - try: - ent_nonce = calculer_entropie(nonce) - if ent_nonce > 7.0: - score += 0.08 - else: - # Nonce peu aléatoire: contre-signal pour GCM - score -= 0.10 - except Exception: - pass - - # Clamp [0,1] - if score < 0.0: - score = 0.0 - if score > 1.0: - score = 1.0 - return score - - except FileNotFoundError: - print(f"Erreur : Le fichier '{chemin_fichier_chiffre}' est introuvable.") - return 0.0 - except Exception as e: - print(f"Erreur lors de l'identification de l'algorithme AES GCM: {e}") - return 0.0 - - def dechiffrer(self, chemin_fichier_chiffre: str, cle_donnee: bytes) -> bytes: - """ - Déchiffre le fichier chiffré avec la clé donnée. - - Args: - chemin_fichier_chiffre(str): Le chemin vers le fichier chiffré. - cle_donnee(bytes): La clé de déchiffrement. - - Returns: - bytes: Le contenu déchiffré ou une chaîne vide en cas d'échec. - """ - try: - # Validation taille de clé: AES-256 => 32 octets - if len(cle_donnee) != self._PBKDF2_LONGUEUR_CLE: - raise ValueError("Erreur : La clé AES-256 doit faire 32 bytes") - - # Lecture du fichier: nonce (12B) + données + tag (16B) - with open(chemin_fichier_chiffre, "rb") as f: - donnees = f.read() - - if len(donnees) < 12 + 16: - return b"" - nonce = donnees[:12] - ciphertext_tag = donnees[12:] - if len(ciphertext_tag) < 16: - return b"" - ciphertext = ciphertext_tag[:-16] - tag = ciphertext_tag[-16:] + # Entropie: signaux faibles (ne doivent jamais suffire à rendre positif tout seuls) + ent_tag = calculer_entropie(tag) + if ent_tag > 7.2: + score += 0.10 + if len(corps) > 0 and calculer_entropie(corps) > 7.0: + score += 0.10 + # Nonce aléatoire plausible (faible poids) + try: + ent_nonce = calculer_entropie(nonce) + if ent_nonce > 7.0: + score += 0.08 + else: + # Nonce peu aléatoire: contre-signal pour GCM + score -= 0.10 + except Exception: + pass - # Déchiffrement AES-GCM - cipher = Cipher(algorithms.AES(cle_donnee), modes.GCM(nonce, tag)) - decryptor = cipher.decryptor() + # Cas ambigu : nonce/tag semblent aléatoires mais le corps est aligné sur 16 octets → pénalité supplémentaire + try: + if (ent_nonce if 'ent_nonce' in locals() else 0) > 7.0 and ent_tag > 7.2 and (len(corps) % 16) == 0: + score -= 0.10 + except Exception: + pass + + # Normalisation, on borne toujours le score dans [0, 1] + if score < 0.0: + score = 0.0 + if score > 1.0: + score = 1.0 + return score + + except FileNotFoundError: + print(f"Erreur : Le fichier '{chemin_fichier_chiffre}' est introuvable.") + return 0.0 + except Exception as e: + print(f"Erreur lors de l'identification de l'algorithme AES GCM: {e}") + return 0.0 + + def dechiffrer(self, chemin_fichier_chiffre: str, cle_donnee: bytes) -> bytes: + """ + Déchiffre le fichier chiffré avec la clé donnée. + + Args: + chemin_fichier_chiffre(str): Le chemin vers le fichier chiffré. + cle_donnee(bytes): La clé de déchiffrement. + + Returns: + bytes: Le contenu déchiffré ou une chaîne vide en cas d'échec. + """ try: - plaintext = decryptor.update(ciphertext) + decryptor.finalize() - return plaintext - except Exception: - # Tag invalide / clé incorrecte - return b"" + # Validation taille de clé: AES-256 => 32 octets + if len(cle_donnee) != self._PBKDF2_LONGUEUR_CLE: + raise ValueError("Erreur : La clé AES-256 doit faire 32 bytes") + + # Lecture du fichier: nonce (12B) + données + tag (16B) + with open(chemin_fichier_chiffre, "rb") as f: + donnees = f.read() + + if len(donnees) < 12 + 16: + return b"" + + nonce = donnees[:12] + ciphertext_tag = donnees[12:] + if len(ciphertext_tag) < 16: + return b"" + ciphertext = ciphertext_tag[:-16] + tag = ciphertext_tag[-16:] + + # Déchiffrement AES-GCM + cipher = Cipher(algorithms.AES(cle_donnee), modes.GCM(nonce, tag)) + decryptor = cipher.decryptor() + try: + plaintext = decryptor.update(ciphertext) + decryptor.finalize() + return plaintext + except Exception: + # Tag invalide / clé incorrecte + return b"" - except FileNotFoundError: - raise - except ValueError as e: - # Erreur de validation de clé - if "doit faire 32 bytes" in str(e): + except FileNotFoundError: raise - return b"" - except Exception as e: - # Erreur générique - return b"" + except ValueError as e: + # Erreur de validation de clé + if "doit faire 32 bytes" in str(e): + raise + return b"" + except Exception as e: + # Erreur générique + return b"" \ No newline at end of file diff --git a/src/analyzers/blowfish_analyzer.py b/src/analyzers/blowfish_analyzer.py index 0533fc9..226eccf 100644 --- a/src/analyzers/blowfish_analyzer.py +++ b/src/analyzers/blowfish_analyzer.py @@ -24,11 +24,12 @@ class Blowfish_Analyzer(CryptoAnalyzer): def identifier_algo(self, chemin_fichier_chiffre: str) -> float: ''' - Détermine la probabilité que l'algo de chiffrement utilisé soit blowfish en: + Estime la probabilité que le fichier soit chiffré avec Blowfish (mode par blocs de 8 octets). - - vérifiant la présence d'un IV à l'en-tête (taille fichier > 8 octets) et que la taille du fichier est un multiple de 8 (blocs de 8 octets pour l'algo blowfish) - - calculant l'entropie des données chiffrées - - calculant l'entropie des sous blocs + Principes: + - IV de 8 octets en tête, puis des données chiffrées multiples de 8 octets. + - On défavorise les motifs plus proches d'AES (multiples de 16). + - L'entropie est prise en compte avec un poids faible. Args: chemin_fichier_chiffre(str): Le chemin du fichier chiffré à traiter (mission1.enc). @@ -44,32 +45,32 @@ def identifier_algo(self, chemin_fichier_chiffre: str) -> float: taille_totale = len(contenu_fichier) TAILLE_IV = 8 - # Gates stricts Blowfish: total > 8, corps multiple de 8 + # Gardes Blowfish: fichier assez long pour contenir l'IV et corps multiple de 8 if taille_totale <= TAILLE_IV: return 0.0 donnees_chiffrees = contenu_fichier[TAILLE_IV:] if len(donnees_chiffrees) == 0 or (len(donnees_chiffrees) % 8) != 0: return 0.0 - # Base: structure Blowfish plausible + # Base: structure Blowfish plausible (IV 8B + corps %8) score += 0.35 - # Favoriser quand la taille totale n'est pas multiple de 16 (moins AES-like) + # Bonus si la taille totale n'est pas multiple de 16 (moins "AES-like") if taille_totale % 16 != 0: score += 0.25 else: score -= 0.35 - # Pénaliser si les données chiffrées (hors IV) sont multiples de 16 (plus AES-like) + # Pénalité si le corps (hors IV) est multiple de 16 (motif plus proche d'AES) if len(donnees_chiffrees) % 16 == 0: score -= 0.25 - # Entropie: faible poids + # Entropie: signal faible, bonus léger si globalement élevée try: entropie_globale = calculer_entropie(donnees_chiffrees) if entropie_globale > 7.3: score += 0.15 - # Pattern par sous-blocs (léger bonus) + # Vérification sur deux moitiés (léger bonus si les deux sont élevées) taille_donnees = len(donnees_chiffrees) moitie = taille_donnees // 2 entropie_moitie1 = calculer_entropie(donnees_chiffrees[:moitie]) @@ -82,7 +83,7 @@ def identifier_algo(self, chemin_fichier_chiffre: str) -> float: except FileNotFoundError: return 0.0 - # Clamp [0,1] + # Normalisation: on borne toujours le score dans [0, 1] if score < 0.0: score = 0.0 if score > 1.0: diff --git a/src/analyzers/chacha20_analyzer.py b/src/analyzers/chacha20_analyzer.py index 875e9c4..8cc092d 100644 --- a/src/analyzers/chacha20_analyzer.py +++ b/src/analyzers/chacha20_analyzer.py @@ -33,19 +33,20 @@ class ChaCha20_Analyzer(CryptoAnalyzer): def identifier_algo(self, chemin_fichier_chiffre: str) -> float: """ - Détermine la probabilité que l'algo de chiffrement utilisé soit l'ChaCha20 en: - - vérifiant la présence d'un nonce de 12 bytes en début de fichier - - vérifiant l'entropie très élevée sur l'ensemble des données - - vérifiant l'absence de padding (pas de contrainte de taille) - - vérifiant que la taille du fichier est suffisante pour contenir un nonce + Estime la probabilité que le fichier soit chiffré avec ChaCha20. - Retourne une probabilité entre 0 et 1 (Pour connaitre la probabilité que l'algo de chiffrement utilisé soit l'ChaCha20). + Idée générale: + - Nonce (12 octets) en tête, puis données chiffrées sans format de bloc (flux). + - En flux, la taille du corps n'est typiquement pas multiple de 8 ni de 16. + - On pénalise un motif AEAD très net (queue de 16 octets très aléatoire + nonce aléatoire), typique de GCM. + - L'entropie est un signal faible, la structure prime. + + Retourne un score entre 0.0 et 1.0. Args: - chemin_fichier_chiffre(str): le chemin du fichier chiffré à traiter. - + chemin_fichier_chiffre (str): Chemin du fichier chiffré à analyser. Returns: - float: La probabilité que l'algo de chiffrement utilisé soit l'ChaCha20 après le calcul. + float: Probabilité estimée que l'algorithme soit ChaCha20. """ try: with open(chemin_fichier_chiffre, 'rb') as f: @@ -63,36 +64,41 @@ def identifier_algo(self, chemin_fichier_chiffre: str) -> float: # Composantes de score score: float = 0.0 - # Pondération révisée: structure flux > entropie - # 1) Tailles bloc (fortes pénalités contre les modes par blocs) + # Pondération: structure de flux > entropie + # 1) Tailles de blocs: fortes pénalités contre les modes par blocs taille_donnees: int = len(corps) if taille_donnees % 16 == 0: score -= 0.40 elif taille_donnees % 8 == 0: score -= 0.20 else: - score += 0.40 # flux typique - # Bonus très léger supplémentaire pour flux typique (ni %8 ni %16) + score += 0.50 # flux typique + # Bonus très léger supplémentaire pour flux (ni %8 ni %16) score += 0.05 - # 2) AEAD-like tag en fin (fort négatif contre GCM) + # 2) Queue de 16 octets très aléatoire (tag AEAD probable): + # pénalité forte seulement si combinée avec nonce très aléatoire et corps suffisant. queue16: bytes = corps[-16:] if len(corps) >= 16 else b"" if queue16: try: ent_queue = calculer_entropie(queue16) - if ent_queue > 7.2: - score -= 0.30 + # Pénalité forte uniquement si le pattern (nonce 12B + queue 16B) est très net et corps significatif + if ent_queue > 7.2 and 'ent_nonce' in locals() and ent_nonce > 7.0 and len(corps) >= 32: + score -= 0.45 elif ent_queue <= 7.0: # Queue ressemblant moins à un tag AEAD → léger bonus score += 0.10 + else: + # Sinon, ne pas sur-pénaliser les queues aléatoires typiques du flux + score += 0.00 except Exception: pass - # 3) Taille totale non multiple de 16 (bonus léger) + # 3) Taille totale non multiple de 16 (bonus léger pour un flux) if len(donnees) % 16 != 0: - score += 0.10 + score += 0.15 - # 4) Entropie: signaux faibles + # 4) Entropie: signaux faibles, ne doivent pas dominer le score try: ent_corp: float = calculer_entropie(corps) if ent_corp > 7.0: @@ -103,7 +109,15 @@ def identifier_algo(self, chemin_fichier_chiffre: str) -> float: except Exception: pass - # Clamp [0,1] + # Pénalité additionnelle si la queue ressemble à un tag AEAD ET le nonce paraît aléatoire (pattern GCM) + try: + ent_queue2 = calculer_entropie(corps[-16:]) if len(corps) >= 16 else 0.0 + if ent_queue2 > 7.2 and 'ent_nonce' in locals() and ent_nonce > 7.0: + score -= 0.10 + except Exception: + pass + + # Normalisation: on borne toujours le score dans [0, 1] if score < 0.0: score = 0.0 if score > 1.0: diff --git a/src/analyzers/fernet_analyzer.py b/src/analyzers/fernet_analyzer.py index 768d022..bf15fde 100644 --- a/src/analyzers/fernet_analyzer.py +++ b/src/analyzers/fernet_analyzer.py @@ -29,17 +29,16 @@ class FernetAnalyzer(CryptoAnalyzer): def identifier_algo(self, chemin_fichier_chiffre: str) -> float: """ - Détermine la probabilité que l'algo de chiffrement soit Fernet en vérifiant - le format Base64 URL-safe, le byte de version et la structure du jeton. + Estime la probabilité que le fichier soit un jeton Fernet valide. - Heuristiques utilisées: - - Format Base64 URL-safe: 30% du score - - Taille minimale: 20% du score - - Version correcte: 30% du score - - Horodatage valide: 20% du score + Étapes vérifiées (pondérations indiquées): + - Encodage Base64 URL-safe (0.30): le contenu doit se décoder sans erreur. + - Taille minimale (0.20): une trame Fernet plausible doit dépasser un seuil. + - Byte de version (0x80) (0.30): premier octet attendu. + - Horodatage réaliste (0.20): timestamp > 2020 et ≤ maintenant. Args: - chemin_fichier_chiffre (str): Le chemin du fichier chiffré à traiter. + chemin_fichier_chiffre (str): Le chemin du fichier chifré à traiter. Returns: float: Score de probabilité entre 0.0 et 1.0. @@ -50,29 +49,29 @@ def identifier_algo(self, chemin_fichier_chiffre: str) -> float: with open(chemin_fichier_chiffre, "rb") as f: contenu_fichier = f.read() - # 1. Vérification du format Base64 URL-safe. + # 1) Le contenu doit être décodable en Base64 URL-safe (sinon ce n'est pas Fernet). contenu_decode_bytes = base64.urlsafe_b64decode(contenu_fichier) score += 0.3 - # 2. Vérification de la taille minimale. + # 2) Taille minimale d'un token Fernet plausible. if len(contenu_decode_bytes) >= self._FERNET_MIN_TAILLE: score += 0.2 else: return 0.0 - # 3. Vérification du premier octet (version 0x80). + # 3) Premier octet = byte de version (0x80) attendu. premier_octet = contenu_decode_bytes[:1] if premier_octet == self._FERNET_VERSION: score += 0.3 else: return 0.0 - # 4. Vérification de l'horodatage. + # 4) Horodatage: doit être dans une plage réaliste. horodatage_bytes = contenu_decode_bytes[1:9] horodatage_entier = int.from_bytes(horodatage_bytes, 'big') - # Vérifie que le timestamp est dans une marge réaliste (après 2020 et avant l'heure actuelle). - # 1577836800 correspond au 1er janvier 2020. + # Vérifie que le timestamp est réaliste (après 2020 et avant l'heure actuelle). + # 1577836800 = 1er janvier 2020. if horodatage_entier > 1577836800 and horodatage_entier <= time.time(): score += 0.2 else: @@ -83,7 +82,7 @@ def identifier_algo(self, chemin_fichier_chiffre: str) -> float: except (binascii.Error, ValueError): return 0.0 - # Clamp [0,1] + # Normalisation: on borne toujours le score dans [0, 1] if score < 0.0: score = 0.0 if score > 1.0: diff --git a/src/detecteur_crypto.py b/src/detecteur_crypto.py index 4c14803..b156009 100644 --- a/src/detecteur_crypto.py +++ b/src/detecteur_crypto.py @@ -1,407 +1,407 @@ -# Import des modules -import os -import time -from typing import List, Union -from pathlib import Path -from rich.progress import Progress -# Import des modules d'analyse -from src.analyzers.aes_cbc_analyzer import Aes_Cbc_Analyzer -from src.crypto_analyzer import CryptoAnalyzer -from src.analyzers.chacha20_analyzer import ChaCha20_Analyzer -from src.analyzers.blowfish_analyzer import Blowfish_Analyzer -from src.analyzers.aes_gcm_analyzer import Aes_Gcm_Analyzer -from src.analyzers.fernet_analyzer import FernetAnalyzer -from src.rapport_mission import rapport_mission -# Import des modules utilitaries -from src.utils import verifier_texte_dechiffre -from rich.progress import Progress -from rich.markdown import Markdown -from rich.console import Console -class ResultatAnalyse: - """ - Classe représentant un résultat d'analyse. - """ - def __init__(self, algo: str, cle: bytes, score_probabilite: float, texte_dechiffre: bytes, temps_execution: float = 0.0, nb_tentatives: int = 0, fichier: str ='', taux_succes: float = 0.0): - self.algo = algo - self.cle = cle - self.score_probabilite = score_probabilite - self.texte_dechiffre = texte_dechiffre - self.temps_execution = temps_execution - self.nb_tentatives = nb_tentatives - self.fichier = fichier, - self.taux_succes = taux_succes -class DetecteurCryptoOrchestrateur: - """ - Classe principale qui centralise tout: - -Lance l'analyse des fichiers et identifie l'algorithme probable, - -Lance les attaques par dictionnaire, - -Lance et coordonnes le processus de dechiffrement - """ - - _NBR_OPERATION_MISSION = 4 - _NBR_OPERATION_ANALYSE = 3 - - def __init__(self): - """ - Initialisation de tous les modules d'analyse disponibles - """ - self.analyzers: dict[str, CryptoAnalyzer] = { - "AES-CBC-256": Aes_Cbc_Analyzer(), - "ChaCha20": ChaCha20_Analyzer(), - "Blowfish": Blowfish_Analyzer(), - "AES-GCM": Aes_Gcm_Analyzer(), - "Fernet": FernetAnalyzer(), - } - self.missions_completees: list[dict[str, Union[str, list[ResultatAnalyse], float]]] = [] - self.statistiques_globales: dict[str, Union[int, float]] = { - "total_fichiers": 0, - "fichiers_dechiffres": 0, - "temps_total": 0.0, - "tentatives_total": 0 - } - - def analyser_fichier_specifique(self, chemin_fichier_chiffre: str, progress : Progress, task, error:bool, nbr_opr_mission: int) -> ResultatAnalyse: - """ - ANALYSE D'UN FICHIER SPÉCIFIQUE - - Sélection du fichier à analyser - - Identification automatique de l'algorithme - - Affichage des scores de probabilité - - Args: - chemin_fichier_chiffre(str): chemin du fichier chiffré à analyser - progress (Progress) : la progress bar à mettre à jour - error(bool): nécessaire pour déterminer les erreurs et définir le message de final de la progress bar - Returns: - ResultatAnalyse: résultat de l'analyse - """ - debut_analyse = time.time() - - try: - # Vérification de l'existence du fichier - avance = (100/(self._NBR_OPERATION_ANALYSE * nbr_opr_mission)) - time.sleep(0.3) # Done : Intégrer la progress bar -> step : Verification du chemin de fichier fourni - progress.update(task_id=task, description="Verification du chemin de fichier fourni", advance=avance * 0.3) - time.sleep(1) - - if not os.path.isfile(Path('data')/f"{chemin_fichier_chiffre}"): - time.sleep(0.3) # TODO : Intégrer la progress bar -> step : Verification du chemin de fichier fourni - progress.update(task_id=task, description="Fichier non trouvé ❌ (Aborting...)", advance=((avance * self._NBR_OPERATION_ANALYSE) - avance * 0.3) ) - time.sleep(1) - error = True - return ResultatAnalyse("", b"", 0.0, b"", 0.0, 0) - - # Initialisation des variables - time.sleep(0.5) # TODO : Mise à jour de la progress bar -> step : Initialisation des utilitaires pour l'identification - progress.update(task_id=task, description="Initialisation des utilitaires pour l'identification", advance=avance*0.2) - time.sleep(1) - - algorithme_detecte = "" - cle = b"" - score_probabilite = 0.0 - texte_dechiffre = b"" - nb_tentatives = 0 - - # Parcours des algorithmes disponibles - scores_algorithmes = {} - - # Pour les arrêts en cas d'erreurs, servira à upgrade la progress bar - cumul_progress_avance = 0 - - for nom_algo, analyzer in self.analyzers.items(): - avance_algo = avance/(len(self.analyzers)*3 * 0.5) - time.sleep(0.5) # TODO : Mise à jour de la progress bar -> step : Utilisation de {algrorithme} pour déterminer le chiffrement - progress.update(task_id=task, description=f"Utilisation de {nom_algo} pour déterminer le chiffrement", advance=avance_algo) - time.sleep(1) - - score = analyzer.identifier_algo(f"data/{chemin_fichier_chiffre}") - scores_algorithmes[nom_algo] = score - - time.sleep(0.5) # TODO : Mise à jour de la progress bar -> step : Analyse des résultats d'identification - progress.update(task_id=task, description="Analyse des résultats d'identification", advance=avance_algo) - time.sleep(1) - - cumul_progress_avance += 2 * avance_algo - - if score > 0.9 : # Seuil de confiance - time.sleep(1) # TODO : Mise à jour de la progress bar -> step : Détection réussie pour {algorithme} et préparation du rapport d'analyse - progress.update(task_id=task, description=f"Détection réussie pour {nom_algo} et préparation du rapport d'analyse", advance=((100/nbr_opr_mission) - cumul_progress_avance)) - time.sleep(1) - - algorithme_detecte = nom_algo - score_probabilite = score - break - else : - time.sleep(1) # TODO : Intégrer la progress bar -> step : Echec d'identification pour {algorithme} - progress.update(task_id=task, description=f"Echec d'identification pour {nom_algo}", advance=avance_algo) - time.sleep(1) - cumul_progress_avance += avance_algo - - if not algorithme_detecte: - print("Aucun algorithme correctement détecté ") - temps_execution = time.time() - debut_analyse - return ResultatAnalyse("", b"", 0.0, b"", temps_execution, nb_tentatives, chemin_fichier_chiffre, 0) - - temps_execution = time.time() - debut_analyse - - return ResultatAnalyse(algorithme_detecte, cle, score_probabilite, texte_dechiffre, temps_execution, nb_tentatives, chemin_fichier_chiffre, 0) - - except Exception as e: - print(f"Erreur lors de l'analyse: {str(e)}") - temps_execution = time.time() - debut_analyse - error = True - return ResultatAnalyse("", b"", 0.0, b"", temps_execution, 0, chemin_fichier_chiffre) - - def __tenter_dechiffrement_avec_dictionnaire(self, chemin_fichier: str, cles_candidates: list[bytes], analyzer: CryptoAnalyzer, resultat: ResultatAnalyse): - """ - Tente de déchiffrer un fichier avec les clés candidates et l'analyzer correspondant - - Args: - chemin_fichier(str) : chemin vers le fichier - cles_candidates(list[bytes]) : les clés candidates retenus par le dossier de clés sur la base des indices - analyzer(CryptoAnalyzer) : l'Analyzer correspondant à ce fichier - resultat(ResultatAnalyse) : les résultats de l'analyse de fichier - - Returns : - bool : si une erreur est survenue ou non - """ - for j, cle in enumerate(cles_candidates): - resultat.nb_tentatives += 1 - - # Déchiffrement et normalisation de l'affichage (évite les \x.. et caractères non imprimables) - donnees = analyzer.dechiffrer(chemin_fichier, cle) - texte_dechiffre = donnees.decode('utf-8', errors='ignore').replace('\x00', ' ') - succes = verifier_texte_dechiffre(texte_dechiffre)['taux_succes'] - - if texte_dechiffre and succes > 60 and len(texte_dechiffre) > 0: - resultat.cle = cle - resultat.texte_dechiffre = texte_dechiffre - resultat.taux_succes = succes - print(f"Clé trouvée après {j+1} tentatives!") - return False - - print("Aucune clé valide trouvée") - return True - - def mission_complete_automatique(self, dossier_chiffres: str, chemin_dictionnaire: str) -> List[ResultatAnalyse]: - """ - MISSION COMPLÈTE AUTOMATIQUE - - Analyse des 5 fichiers séquentiellement - - Tentatives de déchiffrement avec retour visuel - - Rapport de synthèse final - - Args: - dossier_chiffres(str): dossier contenant les fichiers chiffrés - - Returns: - list[ResultatAnalyse]: liste des résultats d'analyse - """ - - debut_mission = time.time() - resultats: list[ResultatAnalyse] = [] - try: - with Progress() as progress : - # Récupération des fichiers .enc - fichiers_enc = [f for f in os.listdir(dossier_chiffres) if f.endswith(".enc")] - - if not fichiers_enc: - print("Aucun fichier .enc trouvé dans le dossier") - return [] - - print(f"{len(fichiers_enc)} fichiers .enc détectés") - print("\nANALYSE SÉQUENTIELLE DES FICHIERS") - time.sleep(0.5) - for i, fichier in enumerate(fichiers_enc, 0): - print(f"\nFICHIER {i+1}/{len(fichiers_enc)}: {fichier}") - - # TODO: New progress bar -> step: Analyse du fichier mission{i+1}.enc - task = progress.add_task(f"Analyse du fichier mission{i+1}.enc...", total=100) - time.sleep(0.5) - - chemin_fichier = os.path.join(dossier_chiffres, fichier) - - # Analyse du fichier - error = False - resultat = self.analyser_fichier_specifique(fichier, progress, task, error, self._NBR_OPERATION_MISSION) - - # Tentative de déchiffrement si algorithme détecté - if resultat.algo: - # TODO: MAJ de la progress bar -> step: Amorçage de la phase de déchiffrement - progress.update(task, description="Amorçage de la phase de déchiffrement...", advance=((100/self._NBR_OPERATION_MISSION) * 0.5)) - time.sleep(1) - - analyzer = self.analyzers[resultat.algo] - - # TODO: MAJ de la progress bar -> step: Récupération des clés candidates - progress.update(task, description="Récupération des clés candidates", advance=(100/self._NBR_OPERATION_MISSION)*0.5) - time.sleep(1) - - cles_candidates = analyzer.generer_cles_candidates(chemin_dictionnaire) - - if cles_candidates: - print(f"Test de {len(cles_candidates)} clés candidates...") - # TODO: MAJ de la progress bar -> step: Test de déchiffrement - progress.update(task, description="Test de déchiffrement", advance=(100/self._NBR_OPERATION_MISSION)) - time.sleep(3) - - error = self.__tenter_dechiffrement_avec_dictionnaire(chemin_fichier, cles_candidates, analyzer, resultat) - - else: - # TODO: MAJ de la progress bar -> step: Abort et récupération des résultats d'analyse - progress.update(task, description="Aucune clé candidate générée ❌ (Aborting ...)", advance=(100/self._NBR_OPERATION_MISSION)) - time.sleep(3) - error = True - - - resultats.append(resultat) - - # retour visuel - if resultat.algo: - # TODO: MAJ de la progress bar -> step: Finalsation et retour de résultats - progress.update(task, description="Finalisation et retour des résultats", advance=(100/self._NBR_OPERATION_MISSION)) - time.sleep(3) - - print(f"{fichier}: {resultat.algo} (score: {resultat.score_probabilite:.2f})") - - message = "[bold green] Mission terminée. ✅[/bold green]\n\n" if not error else "[bold red] Mission terminée: Déchiffrement non concluant. ❌ [/bold red]\n\n" - Console().print(message) - else: - progress.update(task, description="Aborting et récupération des résultats d'analyse...", advance=100) - time.sleep(0.5) # TODO: MAJ de la progress bar -> step: Abort et récupération des résultats d'analyse - Console().print(f"[bold yellow] Mission terminée: Aucun algorithme détecté. ⚠️[/bold yellow]\n\n") - - progress.remove_task(task) - - # Rapport de synthèse final - with Progress() as progress : - task = progress.add_task("Préparation des rapports", total=100) # TODO: New progress bar -> step: Préparation des rapports (1 to 100%) - - while not progress.finished : - progress.update(task, description="Préparation des rapports", advance=2) - time.sleep(0.2) - for i in range(len(fichiers_enc)) : - resultat = { - 'algorithme': resultats[i].algo, - 'fichier': resultats[i].fichier, - 'cle': resultats[i].cle, - 'tentatives': resultats[i].nb_tentatives, - 'temps_execution': resultats[i].temps_execution, - 'taux_succes': resultats[i].taux_succes, - 'statut_succes' : 'Succès' if resultats[i].taux_succes > 60 else 'Echec', - 'texte_dechiffre' : resultats[i].texte_dechiffre - } - rapport_mission().generer_rapport_synthese(resultat) - progress.update(task, description="Mission complète effectuée.") # TODO: MAJ de la progress bar -> step: Mission complète effectuée - - # Mise à jour des statistiques globales - self.missions_completees.append({ - "dossier": dossier_chiffres, - "resultats": resultats, - "temps_total": time.time() - debut_mission - }) - - return resultats - - except Exception as e: - print(f"Erreur lors de la mission complète: {str(e)}") - return [] - - - def attaque_dictionnaire_manuelle(self, chemin_fichier: str, algorithme_choisi: str, chemin_dictionnaire: str) -> ResultatAnalyse: - """ - ATTAQUE PAR DICTIONNAIRE MANUELLE - - Choix du fichier et de l'algorithme - - Suivi en temps réel des tentatives - - Affichage des résultats intermédiaires - - Args: - chemin_fichier(str): chemin du fichier à attaquer - algorithme_choisi(str): algorithme à utiliser - - Returns: - ResultatAnalyse: résultat de l'attaque - """ - - - debut_attaque = time.time() - resultat = ResultatAnalyse("", b"", 0.0, b"", 0.0, 0) - - try: - if algorithme_choisi not in self.analyzers: - print(f"Algorithme {algorithme_choisi} non disponible") - return resultat - - analyzer = self.analyzers[algorithme_choisi] - - # Vérification de l'algorithme - score = analyzer.identifier_algo(chemin_fichier) - resultat.score_probabilite = score - resultat.algo = algorithme_choisi - print(f"Score de confirmation: {score:.2f}") - - if score < 0.3: - print("Score de confiance faible pour cet algorithme") - - # Génération des clés candidates - print(f"Génération des clés candidates") - cles_candidates = analyzer.generer_cles_candidates(chemin_dictionnaire) - print(f"{len(cles_candidates)} clés candidates générées") - - # Attaque par dictionnaire - - self.__tenter_dechiffrement_avec_dictionnaire(chemin_fichier, cles_candidates, analyzer, resultat) - - - temps_execution = time.time() - debut_attaque - resultat.temps_execution = temps_execution - print(f"Temps d'exécution: {temps_execution:.2f} secondes") - - return resultat - - except Exception as e: - print(f"Erreur lors de l'attaque: {str(e)}") - temps_execution = time.time() - debut_attaque - return ResultatAnalyse("", b"", 0.0, b"", temps_execution, 0) - - def attaque_dictionnaire(self,chemin_fichier_chiffrer: str, algo : str, chemin_dico : str = "keys/wordlist.txt"): - - with Progress() as progress: - analyzer = self.analyzers[algo] - - cle_candidates = analyzer.generer_cles_candidates(chemin_dico) - - with open(chemin_dico,'r') as d: - dico = d.readlines() - - with open(f"data/{chemin_fichier_chiffrer}",'rb') as f : - texte_chiffrer = f.read() - - task_id = progress.add_task("Testing...",total=len(cle_candidates)) - - current_task = 0 - - advance = 1 - - - while current_task < len(cle_candidates) : - time.sleep(0.5) - - essai_dechiffrage = analyzer.dechiffrer(f"data/{chemin_fichier_chiffrer}", cle_candidates[current_task]) - - if essai_dechiffrage != b"" : - - progress.update(task_id,advance=len(cle_candidates) - current_task) - - # Retourner un texte décodé/nettoyé pour affichage propre - return essai_dechiffrage.decode('utf-8', errors='ignore').replace('\x00', ' ') - - current_task+=1 - - progress.update(task_id,advance=advance) - - - return "Aucune clé trouvé" - - # print("\n Process is done ...") - - -print(DetecteurCryptoOrchestrateur().attaque_dictionnaire("mission1.enc","AES-CBC-256")) +# Import des modules +import os +import time +from typing import List, Union +from pathlib import Path +from rich.progress import Progress +# Import des modules d'analyse +from src.analyzers.aes_cbc_analyzer import Aes_Cbc_Analyzer +from src.crypto_analyzer import CryptoAnalyzer +from src.analyzers.chacha20_analyzer import ChaCha20_Analyzer +from src.analyzers.blowfish_analyzer import Blowfish_Analyzer +from src.analyzers.aes_gcm_analyzer import Aes_Gcm_Analyzer +from src.analyzers.fernet_analyzer import FernetAnalyzer +from src.rapport_mission import rapport_mission +# Import des modules utilitaries +from src.utils import verifier_texte_dechiffre +from rich.progress import Progress +from rich.markdown import Markdown +from rich.console import Console +class ResultatAnalyse: + """ + Classe représentant un résultat d'analyse. + """ + def __init__(self, algo: str, cle: bytes, score_probabilite: float, texte_dechiffre: bytes, temps_execution: float = 0.0, nb_tentatives: int = 0, fichier: str ='', taux_succes: float = 0.0): + self.algo = algo + self.cle = cle + self.score_probabilite = score_probabilite + self.texte_dechiffre = texte_dechiffre + self.temps_execution = temps_execution + self.nb_tentatives = nb_tentatives + self.fichier = fichier, + self.taux_succes = taux_succes +class DetecteurCryptoOrchestrateur: + """ + Classe principale qui centralise tout: + -Lance l'analyse des fichiers et identifie l'algorithme probable, + -Lance les attaques par dictionnaire, + -Lance et coordonnes le processus de dechiffrement + """ + + _NBR_OPERATION_MISSION = 4 + _NBR_OPERATION_ANALYSE = 3 + + def __init__(self): + """ + Initialisation de tous les modules d'analyse disponibles + """ + self.analyzers: dict[str, CryptoAnalyzer] = { + "AES-CBC-256": Aes_Cbc_Analyzer(), + "ChaCha20": ChaCha20_Analyzer(), + "Blowfish": Blowfish_Analyzer(), + "AES-GCM": Aes_Gcm_Analyzer(), + "Fernet": FernetAnalyzer(), + } + self.missions_completees: list[dict[str, Union[str, list[ResultatAnalyse], float]]] = [] + self.statistiques_globales: dict[str, Union[int, float]] = { + "total_fichiers": 0, + "fichiers_dechiffres": 0, + "temps_total": 0.0, + "tentatives_total": 0 + } + + def analyser_fichier_specifique(self, chemin_fichier_chiffre: str, progress : Progress, task, error:bool, nbr_opr_mission: int) -> ResultatAnalyse: + """ + ANALYSE D'UN FICHIER SPÉCIFIQUE + - Sélection du fichier à analyser + - Identification automatique de l'algorithme + - Affichage des scores de probabilité + + Args: + chemin_fichier_chiffre(str): chemin du fichier chiffré à analyser + progress (Progress) : la progress bar à mettre à jour + error(bool): nécessaire pour déterminer les erreurs et définir le message de final de la progress bar + Returns: + ResultatAnalyse: résultat de l'analyse + """ + debut_analyse = time.time() + + try: + # Vérification de l'existence du fichier + avance = (100/(self._NBR_OPERATION_ANALYSE * nbr_opr_mission)) + time.sleep(0.3) # Done : Intégrer la progress bar -> step : Verification du chemin de fichier fourni + progress.update(task_id=task, description="Verification du chemin de fichier fourni", advance=avance * 0.3) + time.sleep(1) + + if not os.path.isfile(Path('data')/f"{chemin_fichier_chiffre}"): + time.sleep(0.3) # TODO : Intégrer la progress bar -> step : Verification du chemin de fichier fourni + progress.update(task_id=task, description="Fichier non trouvé ❌ (Aborting...)", advance=((avance * self._NBR_OPERATION_ANALYSE) - avance * 0.3) ) + time.sleep(1) + error = True + return ResultatAnalyse("", b"", 0.0, b"", 0.0, 0) + + # Initialisation des variables + time.sleep(0.5) # TODO : Mise à jour de la progress bar -> step : Initialisation des utilitaires pour l'identification + progress.update(task_id=task, description="Initialisation des utilitaires pour l'identification", advance=avance*0.2) + time.sleep(1) + + algorithme_detecte = "" + cle = b"" + score_probabilite = 0.0 + texte_dechiffre = b"" + nb_tentatives = 0 + + # Parcours des algorithmes disponibles + scores_algorithmes = {} + + # Pour les arrêts en cas d'erreurs, servira à upgrade la progress bar + cumul_progress_avance = 0 + + for nom_algo, analyzer in self.analyzers.items(): + avance_algo = avance/(len(self.analyzers)*3 * 0.5) + time.sleep(0.5) # TODO : Mise à jour de la progress bar -> step : Utilisation de {algrorithme} pour déterminer le chiffrement + progress.update(task_id=task, description=f"Utilisation de {nom_algo} pour déterminer le chiffrement", advance=avance_algo) + time.sleep(1) + + score = analyzer.identifier_algo(f"data/{chemin_fichier_chiffre}") + scores_algorithmes[nom_algo] = score + + time.sleep(0.5) # TODO : Mise à jour de la progress bar -> step : Analyse des résultats d'identification + progress.update(task_id=task, description="Analyse des résultats d'identification", advance=avance_algo) + time.sleep(1) + + cumul_progress_avance += 2 * avance_algo + + if score > 0.9 : # Seuil de confiance + time.sleep(1) # TODO : Mise à jour de la progress bar -> step : Détection réussie pour {algorithme} et préparation du rapport d'analyse + progress.update(task_id=task, description=f"Détection réussie pour {nom_algo} et préparation du rapport d'analyse", advance=((100/nbr_opr_mission) - cumul_progress_avance)) + time.sleep(1) + + algorithme_detecte = nom_algo + score_probabilite = score + break + else : + time.sleep(1) # TODO : Intégrer la progress bar -> step : Echec d'identification pour {algorithme} + progress.update(task_id=task, description=f"Echec d'identification pour {nom_algo}", advance=avance_algo) + time.sleep(1) + cumul_progress_avance += avance_algo + + if not algorithme_detecte: + print("Aucun algorithme correctement détecté ") + temps_execution = time.time() - debut_analyse + return ResultatAnalyse("", b"", 0.0, b"", temps_execution, nb_tentatives, chemin_fichier_chiffre, 0) + + temps_execution = time.time() - debut_analyse + + return ResultatAnalyse(algorithme_detecte, cle, score_probabilite, texte_dechiffre, temps_execution, nb_tentatives, chemin_fichier_chiffre, 0) + + except Exception as e: + print(f"Erreur lors de l'analyse: {str(e)}") + temps_execution = time.time() - debut_analyse + error = True + return ResultatAnalyse("", b"", 0.0, b"", temps_execution, 0, chemin_fichier_chiffre) + + def __tenter_dechiffrement_avec_dictionnaire(self, chemin_fichier: str, cles_candidates: list[bytes], analyzer: CryptoAnalyzer, resultat: ResultatAnalyse): + """ + Tente de déchiffrer un fichier avec les clés candidates et l'analyzer correspondant + + Args: + chemin_fichier(str) : chemin vers le fichier + cles_candidates(list[bytes]) : les clés candidates retenus par le dossier de clés sur la base des indices + analyzer(CryptoAnalyzer) : l'Analyzer correspondant à ce fichier + resultat(ResultatAnalyse) : les résultats de l'analyse de fichier + + Returns : + bool : si une erreur est survenue ou non + """ + for j, cle in enumerate(cles_candidates): + resultat.nb_tentatives += 1 + + # Déchiffrement et normalisation de l'affichage (évite les \x.. et caractères non imprimables) + donnees = analyzer.dechiffrer(chemin_fichier, cle) + texte_dechiffre = donnees.decode('utf-8', errors='ignore').replace('\x00', ' ') + succes = verifier_texte_dechiffre(texte_dechiffre)['taux_succes'] + + if texte_dechiffre and succes > 60 and len(texte_dechiffre) > 0: + resultat.cle = cle + resultat.texte_dechiffre = texte_dechiffre + resultat.taux_succes = succes + print(f"Clé trouvée après {j+1} tentatives!") + return False + + print("Aucune clé valide trouvée") + return True + + def mission_complete_automatique(self, dossier_chiffres: str, chemin_dictionnaire: str) -> List[ResultatAnalyse]: + """ + MISSION COMPLÈTE AUTOMATIQUE + - Analyse des 5 fichiers séquentiellement + - Tentatives de déchiffrement avec retour visuel + - Rapport de synthèse final + + Args: + dossier_chiffres(str): dossier contenant les fichiers chiffrés + + Returns: + list[ResultatAnalyse]: liste des résultats d'analyse + """ + + debut_mission = time.time() + resultats: list[ResultatAnalyse] = [] + try: + with Progress() as progress : + # Récupération des fichiers .enc + fichiers_enc = [f for f in os.listdir(dossier_chiffres) if f.endswith(".enc")] + + if not fichiers_enc: + print("Aucun fichier .enc trouvé dans le dossier") + return [] + + print(f"{len(fichiers_enc)} fichiers .enc détectés") + print("\nANALYSE SÉQUENTIELLE DES FICHIERS") + time.sleep(0.5) + for i, fichier in enumerate(fichiers_enc, 0): + print(f"\nFICHIER {i+1}/{len(fichiers_enc)}: {fichier}") + + # TODO: New progress bar -> step: Analyse du fichier mission{i+1}.enc + task = progress.add_task(f"Analyse du fichier mission{i+1}.enc...", total=100) + time.sleep(0.5) + + chemin_fichier = os.path.join(dossier_chiffres, fichier) + + # Analyse du fichier + error = False + resultat = self.analyser_fichier_specifique(fichier, progress, task, error, self._NBR_OPERATION_MISSION) + + # Tentative de déchiffrement si algorithme détecté + if resultat.algo: + # TODO: MAJ de la progress bar -> step: Amorçage de la phase de déchiffrement + progress.update(task, description="Amorçage de la phase de déchiffrement...", advance=((100/self._NBR_OPERATION_MISSION) * 0.5)) + time.sleep(1) + + analyzer = self.analyzers[resultat.algo] + + # TODO: MAJ de la progress bar -> step: Récupération des clés candidates + progress.update(task, description="Récupération des clés candidates", advance=(100/self._NBR_OPERATION_MISSION)*0.5) + time.sleep(1) + + cles_candidates = analyzer.generer_cles_candidates(chemin_dictionnaire) + + if cles_candidates: + print(f"Test de {len(cles_candidates)} clés candidates...") + # TODO: MAJ de la progress bar -> step: Test de déchiffrement + progress.update(task, description="Test de déchiffrement", advance=(100/self._NBR_OPERATION_MISSION)) + time.sleep(3) + + error = self.__tenter_dechiffrement_avec_dictionnaire(chemin_fichier, cles_candidates, analyzer, resultat) + + else: + # TODO: MAJ de la progress bar -> step: Abort et récupération des résultats d'analyse + progress.update(task, description="Aucune clé candidate générée ❌ (Aborting ...)", advance=(100/self._NBR_OPERATION_MISSION)) + time.sleep(3) + error = True + + + resultats.append(resultat) + + # retour visuel + if resultat.algo: + # TODO: MAJ de la progress bar -> step: Finalsation et retour de résultats + progress.update(task, description="Finalisation et retour des résultats", advance=(100/self._NBR_OPERATION_MISSION)) + time.sleep(3) + + print(f"{fichier}: {resultat.algo} (score: {resultat.score_probabilite:.2f})") + + message = "[bold green] Mission terminée. ✅[/bold green]\n\n" if not error else "[bold red] Mission terminée: Déchiffrement non concluant. ❌ [/bold red]\n\n" + Console().print(message) + else: + progress.update(task, description="Aborting et récupération des résultats d'analyse...", advance=100) + time.sleep(0.5) # TODO: MAJ de la progress bar -> step: Abort et récupération des résultats d'analyse + Console().print(f"[bold yellow] Mission terminée: Aucun algorithme détecté. ⚠️[/bold yellow]\n\n") + + progress.remove_task(task) + + # Rapport de synthèse final + with Progress() as progress : + task = progress.add_task("Préparation des rapports", total=100) # TODO: New progress bar -> step: Préparation des rapports (1 to 100%) + + while not progress.finished : + progress.update(task, description="Préparation des rapports", advance=2) + time.sleep(0.2) + for i in range(len(fichiers_enc)) : + resultat = { + 'algorithme': resultats[i].algo, + 'fichier': resultats[i].fichier, + 'cle': resultats[i].cle, + 'tentatives': resultats[i].nb_tentatives, + 'temps_execution': resultats[i].temps_execution, + 'taux_succes': resultats[i].taux_succes, + 'statut_succes' : 'Succès' if resultats[i].taux_succes > 60 else 'Echec', + 'texte_dechiffre' : resultats[i].texte_dechiffre + } + rapport_mission().generer_rapport_synthese(resultat) + progress.update(task, description="Mission complète effectuée.") # TODO: MAJ de la progress bar -> step: Mission complète effectuée + + # Mise à jour des statistiques globales + self.missions_completees.append({ + "dossier": dossier_chiffres, + "resultats": resultats, + "temps_total": time.time() - debut_mission + }) + + return resultats + + except Exception as e: + print(f"Erreur lors de la mission complète: {str(e)}") + return [] + + + def attaque_dictionnaire_manuelle(self, chemin_fichier: str, algorithme_choisi: str, chemin_dictionnaire: str) -> ResultatAnalyse: + """ + ATTAQUE PAR DICTIONNAIRE MANUELLE + - Choix du fichier et de l'algorithme + - Suivi en temps réel des tentatives + - Affichage des résultats intermédiaires + + Args: + chemin_fichier(str): chemin du fichier à attaquer + algorithme_choisi(str): algorithme à utiliser + + Returns: + ResultatAnalyse: résultat de l'attaque + """ + + + debut_attaque = time.time() + resultat = ResultatAnalyse("", b"", 0.0, b"", 0.0, 0) + + try: + if algorithme_choisi not in self.analyzers: + print(f"Algorithme {algorithme_choisi} non disponible") + return resultat + + analyzer = self.analyzers[algorithme_choisi] + + # Vérification de l'algorithme + score = analyzer.identifier_algo(chemin_fichier) + resultat.score_probabilite = score + resultat.algo = algorithme_choisi + print(f"Score de confirmation: {score:.2f}") + + if score < 0.3: + print("Score de confiance faible pour cet algorithme") + + # Génération des clés candidates + print(f"Génération des clés candidates") + cles_candidates = analyzer.generer_cles_candidates(chemin_dictionnaire) + print(f"{len(cles_candidates)} clés candidates générées") + + # Attaque par dictionnaire + + self.__tenter_dechiffrement_avec_dictionnaire(chemin_fichier, cles_candidates, analyzer, resultat) + + + temps_execution = time.time() - debut_attaque + resultat.temps_execution = temps_execution + print(f"Temps d'exécution: {temps_execution:.2f} secondes") + + return resultat + + except Exception as e: + print(f"Erreur lors de l'attaque: {str(e)}") + temps_execution = time.time() - debut_attaque + return ResultatAnalyse("", b"", 0.0, b"", temps_execution, 0) + + def attaque_dictionnaire(self,chemin_fichier_chiffrer: str, algo : str, chemin_dico : str = "keys/wordlist.txt"): + + with Progress() as progress: + analyzer = self.analyzers[algo] + + cle_candidates = analyzer.generer_cles_candidates(chemin_dico) + + with open(chemin_dico,'r') as d: + dico = d.readlines() + + with open(f"data/{chemin_fichier_chiffrer}",'rb') as f : + texte_chiffrer = f.read() + + task_id = progress.add_task("Testing...",total=len(cle_candidates)) + + current_task = 0 + + advance = 1 + + + while current_task < len(cle_candidates) : + time.sleep(0.5) + + essai_dechiffrage = analyzer.dechiffrer(f"data/{chemin_fichier_chiffrer}", cle_candidates[current_task]) + + if essai_dechiffrage != b"" : + + progress.update(task_id,advance=len(cle_candidates) - current_task) + + # Retourner un texte décodé/nettoyé pour affichage propre + return essai_dechiffrage.decode('utf-8', errors='ignore').replace('\x00', ' ') + + current_task+=1 + + progress.update(task_id,advance=advance) + + + return "Aucune clé trouvé" + + # print("\n Process is done ...") + + +print(DetecteurCryptoOrchestrateur().attaque_dictionnaire("mission1.enc","AES-CBC-256"))