1010from datetime import timezone
1111import os
1212from Crypto .Cipher import AES
13- from enum import Enum
1413
14+ from uid2_client .uid2_token_generator import Params , UID2TokenGenerator , _encrypt_gcm , _PayloadType
1515from uid2_client .advertising_token_version import AdvertisingTokenVersion
1616from uid2_client .client_type import ClientType
1717from uid2_client .decryption_status import DecryptionStatus
2222from uid2_client .identity_scope import IdentityScope
2323
2424
25- encryption_block_size = AES .block_size
26- """int: block size for encryption routines
27-
28- This determines the size of initialization vectors (IV), required data padding, etc.
29- """
30-
31-
32- class _PayloadType (Enum ):
33- """Enum for types of payload that can be encoded in opaque strings"""
34- ENCRYPTED_DATA = 128
35- ENCRYPTED_DATA_V3 = 96
36-
37-
3825base64_url_special_chars = {"-" , "_" }
3926
4027
@@ -255,48 +242,6 @@ def _decrypt_token_v3(token_bytes, keys, domain_name, client_type, now, token_ve
255242 keys .get_identity_scope (), identity_type , token_version , is_client_side_generated , expires )
256243
257244
258- def _encrypt_token (uid2 , identity_scope , master_key , site_key , site_id , now , token_expiry , ad_token_version ):
259- site_payload = bytearray (128 )
260- # Publisher Data
261- site_payload [0 :4 ] = int .to_bytes (site_id , byteorder = 'big' , length = 4 ) # Site id
262- site_payload [4 :12 ] = int .to_bytes (0 , byteorder = 'big' , length = 8 ) # Publisher ID
263- site_payload [12 :16 ] = int .to_bytes (0 , byteorder = 'big' , length = 4 ) # Client Key ID
264- # User Identity Data
265- site_payload [16 :20 ] = int .to_bytes (0 , byteorder = 'big' , length = 4 ) # Privacy Bits
266- site_payload [20 :28 ] = int .to_bytes (int (now .timestamp ()) * 1000 , byteorder = 'big' ,
267- length = 8 ) # Established
268- site_payload [28 :36 ] = int .to_bytes (int (now .timestamp ()) * 1000 , byteorder = 'big' , length = 8 ) # last refresh
269- site_payload [36 :] = bytes (base64 .b64decode (uid2 ))
270-
271- id_payload = _encrypt_gcm (bytes (site_payload ), None , site_key .secret )
272-
273- # Operator Identity Data
274- master_payload = bytearray (256 )
275- master_payload [:8 ] = int .to_bytes (int (token_expiry .timestamp ()) * 1000 , byteorder = 'big' , length = 8 ) # Expiry
276- master_payload [8 :16 ] = int .to_bytes (int (now .timestamp ()), byteorder = 'big' , length = 8 ) # Token Created
277- master_payload [16 :20 ] = int .to_bytes (0 , byteorder = 'big' , length = 4 ) # Site ID
278- master_payload [20 :21 ] = int .to_bytes (1 , byteorder = 'big' , length = 1 ) # Operator Type
279- master_payload [21 :25 ] = int .to_bytes (0 , byteorder = 'big' , length = 4 ) # Operator Version
280- master_payload [25 :29 ] = int .to_bytes (0 , byteorder = 'big' , length = 4 ) # Operator Key ID
281- master_payload [29 :33 ] = int .to_bytes (site_key .key_id , byteorder = 'big' , length = 4 ) # Site Key ID
282- master_payload [33 :] = bytes (id_payload )
283-
284- encrypted_master_payload = _encrypt_gcm (bytes (master_payload ), None , master_key .secret )
285-
286- root_writer = bytearray (len (encrypted_master_payload ) + 6 )
287- first_char = uid2 [0 ]
288- identity_type = IdentityType .Phone if first_char == 'F' or first_char == 'B' else IdentityType .Email
289- root_writer [0 :1 ] = int .to_bytes ((int (identity_scope ) << 4 | int (identity_type ) << 2 ) | 3 , byteorder = 'big' , length = 1 )
290- root_writer [1 :2 ] = int .to_bytes (ad_token_version .value , byteorder = 'big' , length = 1 )
291- root_writer [2 :6 ] = int .to_bytes (master_key .key_id , byteorder = 'big' , length = 4 )
292- root_writer [6 :] = bytes (encrypted_master_payload )
293-
294- if ad_token_version == AdvertisingTokenVersion .ADVERTISING_TOKEN_V4 :
295- return EncryptionDataResponse .make_success (Uid2Base64UrlCoder .encode (root_writer ))
296-
297- return EncryptionDataResponse .make_success (base64 .b64encode (root_writer ))
298-
299-
300245# DEPRECATED, DO NOT CALL DIRECTLY. PLEASE USE Uid2Client's client.encrypt()
301246def encrypt (uid2 , identity_scope , keys , keyset_id = None , ** kwargs ):
302247 """ Encrypt an UID2 into a sharing token
@@ -341,7 +286,8 @@ def encrypt(uid2, identity_scope, keys, keyset_id=None, **kwargs):
341286 if identity_scope is None :
342287 identity_scope = keys .get_identity_scope ()
343288 try :
344- return _encrypt_token (uid2 , identity_scope , master_key , key , site_id , now , token_expiry , ad_token_version )
289+ params = Params (expiry = token_expiry , identity_scope = identity_scope , token_created_at = now )
290+ return EncryptionDataResponse .make_success (UID2TokenGenerator .generate_uid2_token_v4 (uid2 , master_key , site_id , key , params ))
345291 except Exception :
346292 return EncryptionDataResponse .make_error (EncryptionStatus .ENCRYPTION_FAILURE )
347293
@@ -431,10 +377,6 @@ def encrypt_data(data, identity_scope, **kwargs):
431377 return base64 .b64encode (result ).decode ('ascii' )
432378
433379
434- def _encrypt_data_v1 (data , key , iv ):
435- return int .to_bytes (key .key_id , 4 , 'big' ) + iv + _encrypt (data , iv , key )
436-
437-
438380# DEPRECATED, DO NOT CALL
439381def decrypt_data (encrypted_data , keys ):
440382 """Decrypt data encrypted with encrypt_data().
@@ -503,16 +445,6 @@ def _decrypt_data_v3(encrypted_bytes, keys):
503445 return DecryptedData (payload [12 :], encrypted_at )
504446
505447
506- def _add_pkcs7_padding (data , block_size ):
507- pad_len = block_size - (len (data ) % block_size )
508- return data + bytes ([pad_len ]) * pad_len
509-
510-
511- def _encrypt (data , iv , key ):
512- cipher = AES .new (key .secret , AES .MODE_CBC , IV = iv )
513- return cipher .encrypt (_add_pkcs7_padding (data , AES .block_size ))
514-
515-
516448def _decrypt (encrypted , iv , key ):
517449 cipher = AES .new (key .secret , AES .MODE_CBC , iv = iv )
518450 data = cipher .decrypt (encrypted )
@@ -521,16 +453,6 @@ def _decrypt(encrypted, iv, key):
521453 return data [:- pad_len ]
522454
523455
524- def _encrypt_gcm (data , iv , secret ):
525- if iv is None :
526- iv = os .urandom (12 )
527- elif len (iv ) != 12 :
528- raise ValueError ("iv must be 12 bytes" )
529- cipher = AES .new (secret , AES .MODE_GCM , nonce = iv )
530- ciphertext , tag = cipher .encrypt_and_digest (data )
531- return cipher .nonce + ciphertext + tag
532-
533-
534456def _decrypt_gcm (encrypted , secret ):
535457 cipher = AES .new (secret , AES .MODE_GCM , nonce = encrypted [:12 ])
536458 return cipher .decrypt_and_verify (encrypted [12 :- 16 ], encrypted [- 16 :])
0 commit comments