Skip to content

Commit 2cd4d28

Browse files
adding sharing tests
1 parent 3622ba2 commit 2cd4d28

File tree

3 files changed

+182
-3
lines changed

3 files changed

+182
-3
lines changed

tests/sharing_test.py

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
import unittest
2+
3+
from tests.uid2_token_generator import UID2TokenGenerator, Params
4+
from uid2_client import *
5+
import base64
6+
7+
_master_secret = bytes(
8+
[139, 37, 241, 173, 18, 92, 36, 232, 165, 168, 23, 18, 38, 195, 123, 92, 160, 136, 185, 40, 91, 173, 165, 221, 168,
9+
16, 169, 164, 38, 139, 8, 155])
10+
_site_secret = bytes(
11+
[32, 251, 7, 194, 132, 154, 250, 86, 202, 116, 104, 29, 131, 192, 139, 215, 48, 164, 11, 65, 226, 110, 167, 14, 108,
12+
51, 254, 125, 65, 24, 23, 133])
13+
_master_key_id = 164
14+
_site_key_id = 165
15+
_test_site_key_id = 166
16+
_site_id = 9000
17+
_site_id2 = 2
18+
19+
_example_id = 'ywsvDNINiZOVSsfkHpLpSJzXzhr6Jx9Z/4Q0+lsEUvM='
20+
_now = dt.datetime.now(tz=timezone.utc)
21+
_master_key = EncryptionKey(_master_key_id, -1, _now - dt.timedelta(days=-1), _now, _now + dt.timedelta(days=1),
22+
_master_secret, keyset_id=9999)
23+
_site_key = EncryptionKey(_site_key_id, _site_id, _now - dt.timedelta(days=-1), _now, _now + dt.timedelta(days=1),
24+
_site_secret, keyset_id=99999)
25+
26+
_client_secret = "ywsvDNINiZOVSsfkHpLpSJzXzhr6Jx9Z/4Q0+lsEUvM="
27+
_example_uid = "ywsvDNINiZOVSsfkHpLpSJzXzhr6Jx9Z/4Q0+lsEUvM="
28+
29+
30+
class TestSharing(unittest.TestCase):
31+
def setup_sharing_and_encrypt(self, id_scope=IdentityScope.UID2):
32+
client = Uid2Client("endpoint", "key", _client_secret)
33+
json = self._key_set_to_json_for_sharing([_master_key, _site_key])
34+
keys = client.refresh_json(json)
35+
36+
ad_token = encrypt_key(_example_uid, id_scope, keys)
37+
38+
return ad_token, keys
39+
40+
def _key_set_to_json_for_sharing(self, keys):
41+
return self._key_set_to_json_for_sharing_with_header("\"default_keyset_id\": 99999,", _site_id, keys)
42+
43+
def _key_set_to_json_for_sharing_with_header(self, default_keyset, caller_site_id, keys):
44+
return """{{
45+
"body": {{
46+
"caller_site_id": {0},
47+
"master_keyset_id": 1,
48+
"token_expiry_seconds": 86400,
49+
{1}
50+
"keys": [{2}
51+
]
52+
}}
53+
}}""".format(caller_site_id, default_keyset, ",\n".join([self.format_key(x) for x in keys]))
54+
55+
def format_key(self, key: EncryptionKey):
56+
return """
57+
{{
58+
"id": {0},
59+
{1}
60+
"created": {2},
61+
"activates": {3},
62+
"expires": {4},
63+
"secret": "{5}"
64+
}}""".format(key.key_id,
65+
"" if key.keyset_id is None else '"keyset_id": ' + str(key.keyset_id) + ",",
66+
int(key.created.timestamp()),
67+
int(key.activates.timestamp()),
68+
int(key.expires.timestamp()),
69+
base64.b64encode(key.secret).decode("utf-8"))
70+
71+
def test_can_encrypt_and_decrypt_for_sharing(self):
72+
ad_token, keys = self.setup_sharing_and_encrypt()
73+
results = decrypt_token(ad_token, keys)
74+
self.assertEqual(_example_uid, results.uid2)
75+
76+
def test_can_decrypt_another_clients_encrypted_token(self):
77+
ad_token, keys = self.setup_sharing_and_encrypt()
78+
receiving_client = Uid2Client("endpoint2", "authkey2", _client_secret)
79+
keys_json = self._key_set_to_json_for_sharing_with_header('"default_keyset_id": 12345,', 4874, [_master_key, _site_key])
80+
81+
receiving_keys = receiving_client.refresh_json(keys_json)
82+
83+
result = decrypt_token(ad_token, receiving_keys)
84+
self.assertEqual(_example_uid, result.uid2)
85+
86+
def test_sharing_token_is_v4(self):
87+
ad_token, keys = self.setup_sharing_and_encrypt()
88+
contains_base_64_special_chars = "+" in ad_token or "/" in ad_token or "=" in ad_token
89+
self.assertFalse(contains_base_64_special_chars)
90+
91+
def test_uid2_client_produces_uid2_token(self):
92+
ad_token, keys = self.setup_sharing_and_encrypt()
93+
self.assertEqual("A", ad_token[0])
94+
95+
def test_euid_client_produces_euid_token(self):
96+
ad_token, keys = self.setup_sharing_and_encrypt(IdentityScope.EUID)
97+
self.assertEqual("E", ad_token[0])
98+
99+
def _get_token_identity_type(self, uid2, keys):
100+
token = encrypt_key(uid2, IdentityScope.UID2, keys)
101+
102+
first_char = token[0]
103+
if ('A' == first_char or 'E' == first_char):
104+
return IdentityType.Email
105+
if ('F' == first_char or 'B' == first_char):
106+
return IdentityType.Phone
107+
108+
return None
109+
110+
def test_raw_uid_produces_correct_identity_type_in_token(self):
111+
ad_token, keys = self.setup_sharing_and_encrypt()
112+
113+
self.assertEqual(IdentityType.Email, self._get_token_identity_type("Q4bGug8t1xjsutKLCNjnb5fTlXSvIQukmahYDJeLBtk=", keys))
114+
self.assertEqual(IdentityType.Phone, self._get_token_identity_type("BEOGxroPLdcY7LrSiwjY52+X05V0ryELpJmoWAyXiwbZ", keys))
115+
self.assertEqual(IdentityType.Email, self._get_token_identity_type("oKg0ZY9ieD/CGMEjAA0kcq+8aUbLMBG0MgCT3kWUnJs=", keys))
116+
self.assertEqual(IdentityType.Email, self._get_token_identity_type("AKCoNGWPYng/whjBIwANJHKvvGlGyzARtDIAk95FlJyb", keys))
117+
self.assertEqual(IdentityType.Email, self._get_token_identity_type("EKCoNGWPYng/whjBIwANJHKvvGlGyzARtDIAk95FlJyb", keys))
118+
119+
def test_multiple_keys_per_keyset(self):
120+
master_key2 = EncryptionKey(264, -1, _now - dt.timedelta(days=-2), _now - dt.timedelta(days=-1), _now - dt.timedelta(hours=-1),
121+
_master_secret, keyset_id=9999)
122+
site_key2 = EncryptionKey(_site_key_id, _site_id, _now - dt.timedelta(days=-2), _now - dt.timedelta(days=-1), _now - dt.timedelta(hours=-1),
123+
_site_secret, keyset_id=99999)
124+
client = Uid2Client("endpoint", "authkey", _client_secret)
125+
json_body = self._key_set_to_json_for_sharing([_master_key, master_key2, _site_key, site_key2])
126+
keys = client.refresh_json(json_body)
127+
128+
ad_token = encrypt_key(_example_uid, IdentityScope.UID2, keys)
129+
130+
result = decrypt_token(ad_token, keys)
131+
132+
self.assertEqual(_example_uid, result.uid2)
133+
134+
def test_cannot_encrypt_if_no_key_from_default_keyset(self):
135+
client = Uid2Client("endpoint", "authkey", _client_secret)
136+
json_body = self._key_set_to_json_for_sharing([_master_key])
137+
keys = client.refresh_json(json_body)
138+
139+
self.assertRaises(EncryptionError, encrypt_key, _example_uid, IdentityScope.UID2, keys)
140+
141+
# def test_cannot_encrypt_if_theres_no_default_keyset_header(self):
142+
# client = Uid2Client("endpoint", "authkey", _client_secret)
143+
# json_body = self._key_set_to_json_for_sharing_with_header("", 99999, [_master_key, _site_key])
144+
# keys = client.refresh_json(json_body)
145+
#
146+
# self.assertRaises(EncryptionError, encrypt_key, _example_uid, IdentityScope.UID2, keys)
147+
148+
def test_expiry_in_token_matches_expiry_in_reponse(self):
149+
client = Uid2Client("endpoint", "authkey", _client_secret)
150+
json_body = self._key_set_to_json_for_sharing_with_header('"default_keyset_id": 99999, "token_expiry_seconds": 2,', 99999, [_master_key, _site_key])
151+
keys = client.refresh_json(json_body)
152+
153+
now = dt.datetime.now(tz=timezone.utc)
154+
ad_token = encrypt_key(_example_uid, IdentityScope.UID2, keys)
155+
156+
result = decrypt_token(ad_token, keys, now=now+dt.timedelta(seconds=1))
157+
self.assertEqual(_example_uid, result.uid2)
158+
159+
self.assertRaises(EncryptionError, decrypt_token, ad_token, keys, now=now+dt.timedelta(seconds=3))
160+
161+
def test_encrypt_key_inactive(self):
162+
client = Uid2Client("endpoint", "authkey", _client_secret)
163+
key = EncryptionKey(245, _site_id, _now, _now + dt.timedelta(days=1), _now +dt.timedelta(days=2), _site_secret, keyset_id=99999)
164+
keys = client.refresh_json(self._key_set_to_json_for_sharing([_master_key, key]))
165+
self.assertRaises(EncryptionError, encrypt_key, _example_uid, IdentityScope.UID2, keys)
166+
167+
def test_encrypt_key_expired(self):
168+
client = Uid2Client("endpoint", "authkey", _client_secret)
169+
key = EncryptionKey(245, _site_id, _now, _now, _now - dt.timedelta(days=1), _site_secret, keyset_id=99999)
170+
keys = client.refresh_json(self._key_set_to_json_for_sharing([_master_key, key]))
171+
self.assertRaises(EncryptionError, encrypt_key, _example_uid, IdentityScope.UID2, keys)
172+
173+

uid2_client/client.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,14 @@ def refresh_keys(self):
6565
req, nonce = self._make_v2_request(dt.datetime.now(tz=timezone.utc))
6666
resp = self._post('/v2/key/sharing', headers=self._auth_headers(), data=req)
6767
resp_body = json.loads(self._parse_v2_response(resp.read(), nonce)).get('body')
68+
return self._parse_keys_json(resp_body)
69+
70+
def refresh_json(self, json_str):
71+
body = json.loads(json_str)
72+
return self._parse_keys_json(body['body'])
73+
74+
75+
def _parse_keys_json(self, resp_body):
6876
keys = []
6977
for key in resp_body["keys"]:
7078
keyset_id = None
@@ -78,7 +86,6 @@ def refresh_keys(self):
7886
base64.b64decode(key['secret']),
7987
keyset_id)
8088
keys.append(key)
81-
8289
return EncryptionKeysCollection(keys, resp_body["caller_site_id"], resp_body["master_keyset_id"],
8390
resp_body["default_keyset_id"], resp_body["token_expiry_seconds"])
8491

uid2_client/encryption.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,7 @@ def encrypt_key(uid2, indentity_scope, keys, keyset_id=None, **kwargs):
224224
return
225225

226226
if key is None:
227-
print("No Keyset Key found")
228-
return
227+
raise EncryptionError("No Keyset Key Found")
229228

230229
return _encrypt_token(uid2, indentity_scope, master_key, key, site_id, now, token_expiry, ad_token_version)
231230

0 commit comments

Comments
 (0)