22from unittest .mock import patch
33
44from test_utils import *
5- from uid2_client import BidstreamClient , EncryptionError , Uid2Base64UrlCoder , DecryptionStatus
5+ from uid2_client import BidstreamClient , Uid2Base64UrlCoder , DecryptionStatus , Uid2ClientFactory
66from uid2_client .refresh_response import RefreshResponse
77
88
@@ -15,21 +15,30 @@ class TestBidStreamClient(unittest.TestCase):
1515 _CONST_BASE_URL = 'base_url'
1616 _CONST_API_KEY = 'api_key'
1717
18- def assert_success (self , decryption_response , token_version , scope ):
18+ def _assert_success (self , decryption_response , token_version , scope ):
19+ self .assertTrue (decryption_response .success )
20+ self .assertEqual (decryption_response .uid , example_uid )
1921 self .assertEqual (decryption_response .advertising_token_version , token_version )
2022 if (token_version == AdvertisingTokenVersion .ADVERTISING_TOKEN_V3
2123 or token_version == AdvertisingTokenVersion .ADVERTISING_TOKEN_V4 ):
2224 self .assertEqual (decryption_response .identity_type , IdentityType .Email )
2325 else :
2426 self .assertEqual (decryption_response .identity_type , None )
25- self .assertEqual (decryption_response .uid , example_uid )
2627 self .assertEqual (decryption_response .identity_scope , scope )
2728 self .assertEqual (decryption_response .is_client_side_generated , False )
28- self .assertEqual ((now - decryption_response .established ).total_seconds (), 0 )
29+
30+ def _assert_fails (self , decryption_response , token_version , scope ):
31+ self .assertFalse (decryption_response .success )
32+ self .assertEqual (decryption_response .status , DecryptionStatus .INVALID_TOKEN_LIFETIME )
33+ self .assertEqual (decryption_response .advertising_token_version , token_version )
34+ self .assertEqual (decryption_response .identity_scope , scope )
35+ if (token_version == AdvertisingTokenVersion .ADVERTISING_TOKEN_V3
36+ or token_version == AdvertisingTokenVersion .ADVERTISING_TOKEN_V4 ):
37+ self .assertEqual (decryption_response .identity_type , IdentityType .Email )
2938
3039 def decrypt_and_assert_success (self , token , token_version , scope ):
3140 decrypted = self ._client .decrypt_token_into_raw_uid (token , None )
32- self .assert_success (decrypted , token_version , scope )
41+ self ._assert_success (decrypted , token_version , scope )
3342
3443 def setUp (self ):
3544 self ._key_collection = create_key_collection (IdentityScope .UID2 )
@@ -47,9 +56,8 @@ def test_smoke_test(self, mock_refresh_bidstream_keys): # SmokeTest
4756 def test_phone_uids (self , mock_refresh_bidstream_keys ): # PhoneTest
4857 for expected_scope , expected_version in test_cases_all_scopes_v3_v4_versions :
4958 with self .subTest (expected_scope = expected_scope , expected_version = expected_version ):
50- mock_refresh_bidstream_keys .return_value = RefreshResponse .make_success (EncryptionKeysCollection ([master_key , site_key ],
51- expected_scope , site_id , 1 ,
52- 99999 , 86400 ))
59+ mock_refresh_bidstream_keys .return_value = RefreshResponse .make_success (
60+ create_key_collection (expected_scope ))
5361 self ._client .refresh ()
5462 token = generate_uid_token (expected_scope , expected_version , phone_uid )
5563 self .assertEqual (IdentityType .Phone , get_identity_type (token ))
@@ -72,19 +80,18 @@ def test_token_lifetime_too_long_for_bidstream(self, mock_refresh_bidstream_keys
7280 max_bidstream_lifetime_seconds = max_bidstream_lifetime ))
7381 self ._client .refresh ()
7482 result = self ._client .decrypt_token_into_raw_uid (token , None )
75- self .assertFalse (result .success )
76- self .assertEqual (result .status , DecryptionStatus .INVALID_TOKEN_LIFETIME )
83+ self ._assert_fails (result , expected_version , expected_scope )
7784
7885 def test_token_generated_in_the_future_to_simulate_clock_skew (self , mock_refresh_bidstream_keys ): # TokenGeneratedInTheFutureToSimulateClockSkew
7986 created_at_future = dt .datetime .now (tz = timezone .utc ) + dt .timedelta (minutes = 31 ) #max allowed clock skew is 30m
8087 for expected_scope , expected_version in test_cases_all_scopes_all_versions :
8188 with self .subTest (expected_scope = expected_scope , expected_version = expected_version ):
8289 token = generate_uid_token (expected_scope , expected_version , created_at = created_at_future )
83- mock_refresh_bidstream_keys .return_value = RefreshResponse .make_success (EncryptionKeysCollection (
84- [ master_key , site_key ], expected_scope , site_id , 1 , 99999 , 86400 ))
90+ mock_refresh_bidstream_keys .return_value = RefreshResponse .make_success (
91+ create_key_collection ( expected_scope ))
8592 self ._client .refresh ()
8693 result = self ._client .decrypt_token_into_raw_uid (token , None )
87- self .assertFalse (result . success )
94+ self ._assert_fails (result , expected_version , expected_scope )
8895 self .assertEqual (result .status , DecryptionStatus .INVALID_TOKEN_LIFETIME )
8996
9097 def test_token_generated_in_the_future_within_allowed_clock_skew (self , mock_refresh_bidstream_keys ): # TokenGeneratedInTheFutureWithinAllowedClockSkew
@@ -93,13 +100,53 @@ def test_token_generated_in_the_future_within_allowed_clock_skew(self, mock_refr
93100 with self .subTest (expected_scope = expected_scope , expected_version = expected_version ):
94101 token = generate_uid_token (expected_scope , expected_version , expires_at = created_at_future )
95102 mock_refresh_bidstream_keys .return_value = RefreshResponse .make_success (
96- EncryptionKeysCollection ([master_key , site_key ], expected_scope , site_id , 1 ,
97- 99999 , 86400 ))
103+ create_key_collection (expected_scope ))
98104 self ._client .refresh ()
99- result = self ._client .decrypt_token_into_raw_uid (token , None )
100- self .assertIsNotNone (result )
101- self .assertEqual (result .identity_scope , expected_scope )
102- self .assertEqual (result .advertising_token_version , expected_version )
105+ self .decrypt_and_assert_success (token , expected_version , expected_scope )
106+
107+ @patch ('uid2_client.client.refresh_sharing_keys' )
108+ def test_token_generated_in_the_future_legacy_client (self , mock_refresh_keys_util , mock_refresh_bidstream_keys ): # TokenGeneratedInTheFutureLegacyClient
109+ created_at_future = dt .datetime .now (tz = timezone .utc ) + dt .timedelta (minutes = 3 ) # max allowed clock skew is 30m
110+ legacy_client = Uid2ClientFactory .create (self ._CONST_BASE_URL , self ._CONST_API_KEY , client_secret )
111+
112+ for expected_scope , expected_version in test_cases_all_scopes_all_versions :
113+ with self .subTest (expected_scope = expected_scope , expected_version = expected_version ):
114+ mock_refresh_keys_util .return_value = RefreshResponse .make_success (create_key_collection (
115+ expected_scope ))
116+ legacy_client .refresh_keys ()
117+ token = generate_uid_token (expected_scope , expected_version , created_at = created_at_future )
118+ result = legacy_client .decrypt (token )
119+ self ._assert_success (result , expected_version , expected_scope )
120+
121+ @patch ('uid2_client.client.refresh_sharing_keys' )
122+ def test_token_lifetime_too_long_legacy_client (self , mock_refresh_keys_util ,
123+ mock_refresh_bidstream_keys ): # TokenLifetimeTooLongLegacyClient
124+ expires_in_sec = IN_3_DAYS + dt .timedelta (minutes = 1 )
125+ legacy_client = Uid2ClientFactory .create (self ._CONST_BASE_URL , self ._CONST_API_KEY , client_secret )
126+
127+ for expected_scope , expected_version in test_cases_all_scopes_all_versions :
128+ with self .subTest (expected_scope = expected_scope , expected_version = expected_version ):
129+ mock_refresh_keys_util .return_value = RefreshResponse .make_success (create_key_collection (
130+ expected_scope ))
131+ legacy_client .refresh_keys ()
132+ token = generate_uid_token (expected_scope , expected_version , expires_at = expires_in_sec )
133+ result = legacy_client .decrypt (token )
134+ self ._assert_success (result , expected_version , expected_scope ) # check skipped for legacy clients
135+
136+ def test_identity_scope_and_types (self , mock_refresh_bidstream_keys ): # IdentityScopeAndType_TestCases
137+ test_cases = [
138+ [example_email_raw_uid2_v2 , IdentityScope .UID2 , IdentityType .Email ],
139+ [example_phone_raw_uid2_v2 , IdentityScope .UID2 , IdentityType .Phone ],
140+ [example_email_raw_uid2_v2 , IdentityScope .EUID , IdentityType .Email ],
141+ [example_phone_raw_uid2_v2 , IdentityScope .EUID , IdentityType .Phone ]
142+ ]
143+ for uid , identity_scope , identity_type in test_cases :
144+ with self .subTest (identity_scope = identity_scope , identity_type = identity_type ):
145+ token = generate_uid_token (identity_scope , AdvertisingTokenVersion .ADVERTISING_TOKEN_V4 )
146+ mock_refresh_bidstream_keys .return_value = RefreshResponse .make_success (
147+ create_key_collection (identity_scope ))
148+ self ._client .refresh ()
149+ self .decrypt_and_assert_success (token , AdvertisingTokenVersion .ADVERTISING_TOKEN_V4 , identity_scope )
103150
104151 def test_empty_keys (self , mock_refresh_bidstream_keys ): # EmptyKeyContainer
105152 token = generate_uid_token (IdentityScope .UID2 , AdvertisingTokenVersion .ADVERTISING_TOKEN_V3 )
0 commit comments