From 5f042146de94945a4323a356d6ecae984323ca8d Mon Sep 17 00:00:00 2001 From: Mahmoud Hashemi Date: Fri, 13 Feb 2026 21:37:42 +0000 Subject: [PATCH 1/5] modernize: py3 cleanup, per-custodian KDF params, pyproject.toml - Remove all Python 2 compat (unicode shims, cStringIO, imp, __future__) - Add per-custodian KDF parameter storage (v1 pwdkm format) - v0 custodians (existing) continue using module-level OPSLIMIT/MEMLIMIT - v1 custodians store opslimit/memlimit in their pwdkm field - Backward compatible: v0 files load and work unchanged - Add KDF_SENSITIVE and KDF_INTERACTIVE named parameter tuples - SENSITIVE: ~0.8s per KDF call (production default) - INTERACTIVE: ~0.1s per KDF call (dev/testing, 8x faster) - Add --fast-crypto CLI flag for init/add-key-custodian - Convert setup.py to pyproject.toml - Modernize tox.ini (py39-py313) - Expand test suite: 11 tests (was 4), covering v0/v1 round-trip, coexistence, KDF params, check_creds, audit log, error cases - Bump version to 24.0.0dev --- .gitignore | 1 + pocket_protector/__init__.py | 2 + pocket_protector/__main__.py | 14 ++- pocket_protector/_version.py | 7 +- pocket_protector/cli.py | 23 ++--- pocket_protector/file_keys.py | 82 +++++++++++----- pocket_protector/tests/conftest.py | 1 - pocket_protector/tests/test_cli.py | 30 ++++-- pocket_protector/tests/test_file_keys.py | 119 ++++++++++++++++++++++- pyproject.toml | 41 ++++++++ requirements.in | 9 +- setup.py | 62 ------------ tox.ini | 26 +---- 13 files changed, 260 insertions(+), 157 deletions(-) create mode 100644 pyproject.toml delete mode 100644 setup.py diff --git a/.gitignore b/.gitignore index 8e63b65..d93868e 100644 --- a/.gitignore +++ b/.gitignore @@ -48,3 +48,4 @@ nosetests.xml *.sw[op] .cache/ +.venv/ diff --git a/pocket_protector/__init__.py b/pocket_protector/__init__.py index e69de29..42f9373 100644 --- a/pocket_protector/__init__.py +++ b/pocket_protector/__init__.py @@ -0,0 +1,2 @@ +from ._version import __version__ +from .file_keys import KeyFile, Creds, PPError, KDF_SENSITIVE, KDF_INTERACTIVE diff --git a/pocket_protector/__main__.py b/pocket_protector/__main__.py index 68b0bbc..ef9d00c 100644 --- a/pocket_protector/__main__.py +++ b/pocket_protector/__main__.py @@ -1,12 +1,18 @@ import os import sys -from .cli import main +from .cli import main as _main -if __name__ == '__main__': # pragma: no cover + +def main(): try: - sys.exit(main() or 0) + sys.exit(_main() or 0) except Exception: if os.getenv('PPROTECT_ENABLE_DEBUG'): - import pdb;pdb.post_mortem() + import pdb + pdb.post_mortem() raise + + +if __name__ == '__main__': + main() diff --git a/pocket_protector/_version.py b/pocket_protector/_version.py index f41ac9e..9765f67 100644 --- a/pocket_protector/_version.py +++ b/pocket_protector/_version.py @@ -1,6 +1 @@ - -# The version of PocketProtector, used in the version subcommand, as -# well as in setup.py. For full release directions, see the bottom of -# setup.py. - -__version__ = '20.0.2dev' +__version__ = '24.0.0dev' diff --git a/pocket_protector/cli.py b/pocket_protector/cli.py index 3afd2ca..6bcff9e 100644 --- a/pocket_protector/cli.py +++ b/pocket_protector/cli.py @@ -23,20 +23,6 @@ # TODO: custodian-signed values. allow custodians to sign values # added/set by others, then produced reports on which secrets have been # updated/changed but not signed yet. enables a review/audit mechanism. - -try: - unicode -except NameError: - # py3 - unicode = str - - -def _get_text(inp): - if not isinstance(inp, unicode): - return inp.decode('utf8') - return inp - - def _create_protected(path): if os.path.exists(path): raise UsageError('Protected file already exists: %s' % path, 2) @@ -117,7 +103,7 @@ def _get_creds(kf, passphrase = prompt.secret('Passphrase: ', confirm=False) passphrase_source = 'stdin' - creds = Creds(_get_text(user or ''), _get_text(passphrase or ''), + creds = Creds(user or '', passphrase or '', name_source=user_source, passphrase_source=passphrase_source) _check_creds(kf, creds) @@ -162,6 +148,8 @@ def _get_cmd(prepare=False): doc="the acting user's email credential") cmd.add('--passphrase-file', doc='path to a file containing only the passphrase, likely provided by a deployment system') + cmd.add('--fast-crypto', parse_as=True, + doc='use faster (less secure) KDF parameters, suitable for development') # add middlewares, outermost first ("first added, first called") cmd.add(mw_verify_creds) @@ -213,10 +201,13 @@ def main(argv=None): # pragma: no cover (see note in tests.test_cli.test_main) The following subcommand handlers all update/write to a protected file (wkf). """ -def add_key_custodian(wkf): +def add_key_custodian(wkf, fast_crypto=None): 'add a new key custodian to the protected' echo('Adding new key custodian.') creds = _get_new_creds() + if fast_crypto: + from .file_keys import KDF_INTERACTIVE + return wkf.add_key_custodian(creds, opslimit=KDF_INTERACTIVE[0], memlimit=KDF_INTERACTIVE[1]) return wkf.add_key_custodian(creds) diff --git a/pocket_protector/file_keys.py b/pocket_protector/file_keys.py index fce3dec..21d4b7f 100644 --- a/pocket_protector/file_keys.py +++ b/pocket_protector/file_keys.py @@ -10,10 +10,8 @@ import collections import datetime import hashlib -try: - from cStringIO import StringIO -except ImportError: - from io import StringIO +import struct +from io import StringIO import attr import nacl.utils @@ -25,11 +23,6 @@ from boltons.dictutils import OMD from boltons.fileutils import atomic_save -try: - unicode = unicode -except NameError: - unicode = str # py3 - _VALID_NAME_RE = re.compile(r"^[A-z][-_A-z0-9]*\Z") @@ -71,20 +64,25 @@ def _as_l(sub_schema): return _coerce(list, sub_schema) })) -OPSLIMIT = nacl.pwhash.argon2id.OPSLIMIT_SENSITIVE -MEMLIMIT = nacl.pwhash.argon2id.MEMLIMIT_MODERATE +# KDF difficulty levels (opslimit, memlimit) +KDF_SENSITIVE = (nacl.pwhash.argon2id.OPSLIMIT_SENSITIVE, nacl.pwhash.argon2id.MEMLIMIT_MODERATE) # ~0.8s, 256MB - production default +KDF_INTERACTIVE = (nacl.pwhash.argon2id.OPSLIMIT_INTERACTIVE, nacl.pwhash.argon2id.MEMLIMIT_INTERACTIVE) # ~0.1s, 64MB - dev/testing + +# Keep module-level defaults for backward compat (v0 custodians use these) +OPSLIMIT = KDF_SENSITIVE[0] +MEMLIMIT = KDF_SENSITIVE[1] # NOTE: this is a public class since it must be passed in @attr.s(frozen=True) class Creds(object): "Stores credentials used to open a KeyFile" - name = attr.ib(validator=attr.validators.instance_of(unicode)) - passphrase = attr.ib(validator=attr.validators.instance_of(unicode)) + name = attr.ib(validator=attr.validators.instance_of(str)) + passphrase = attr.ib(validator=attr.validators.instance_of(str)) name_source = attr.ib(default=None) passphrase_source = attr.ib(default=None) -def _kdf(creds, salt): +def _kdf(creds, salt, opslimit=None, memlimit=None): name = creds.name.encode('utf8') passphrase = creds.passphrase.encode('utf8') @@ -94,8 +92,8 @@ def _kdf(creds, salt): return nacl.pwhash.argon2id.kdf( nacl.public.PrivateKey.SIZE, valet_key, hashlib.sha512(salt + name).digest()[:16], - opslimit=OPSLIMIT, - memlimit=MEMLIMIT) + opslimit=opslimit if opslimit is not None else OPSLIMIT, + memlimit=memlimit if memlimit is not None else MEMLIMIT) def _decode(b64): @@ -128,6 +126,8 @@ class _KeyCustodian(object): name = attr.ib() _public_key = attr.ib() _salt = attr.ib() + _opslimit = attr.ib(default=None) + _memlimit = attr.ib(default=None) def encrypt_for(self, bytes): 'encrypt the passed bytes so that this key-custodian can decrypt' @@ -137,27 +137,55 @@ def decrypt_as(self, creds, bytes): 'decrypt the passed bytes that were encrypted for this key-custodian' assert creds.name == self.name return nacl.public.SealedBox( - nacl.public.PrivateKey(_kdf(creds, self._salt))).decrypt(bytes) + nacl.public.PrivateKey(_kdf(creds, self._salt, + opslimit=self._opslimit, + memlimit=self._memlimit))).decrypt(bytes) @classmethod - def from_creds(cls, creds): + def from_creds(cls, creds, opslimit=None, memlimit=None): 'create a new user based on new credentials' salt = os.urandom(8) - private_key = nacl.public.PrivateKey(_kdf(creds, salt)) + private_key = nacl.public.PrivateKey(_kdf(creds, salt, + opslimit=opslimit, + memlimit=memlimit)) return cls( - name=creds.name, public_key=private_key.public_key, salt=salt) + name=creds.name, public_key=private_key.public_key, salt=salt, + opslimit=opslimit, memlimit=memlimit) @classmethod def from_data(cls, name, data): # password derived key material - pwdkm = _decode(data['pwdkm']) - salt, public_key = pwdkm[:8], pwdkm[8:8 + nacl.public.PublicKey.SIZE] - return cls( - name=name, public_key=nacl.public.PublicKey(public_key), salt=salt) + raw = base64.b64decode(data['pwdkm']) + version = raw[0] + if version == 0: + # v0: version(1) + salt(8) + pubkey(32) + payload = raw[1:] + salt, public_key = payload[:8], payload[8:8 + nacl.public.PublicKey.SIZE] + return cls( + name=name, public_key=nacl.public.PublicKey(public_key), + salt=salt, opslimit=None, memlimit=None) + elif version == 1: + # v1: version(1) + opslimit(4 LE) + memlimit(4 LE) + salt(8) + pubkey(32) + opslimit, memlimit = struct.unpack_from('20 log entries + assert len(kf.get_audit_log()) > 20 + kf2 = kf.truncate_audit_log(5) + assert len(kf2.get_audit_log()) == 6 # 1 truncation message + 5 kept + + +def test_error_cases(_fast_crypto): + """Test error handling.""" + creds = file_keys.Creds('user@example.com', 'pass') + tmp = tempfile.NamedTemporaryFile() + kf = file_keys.KeyFile.create(path=tmp.name) + kf = kf.add_key_custodian(creds) + kf = kf.add_domain('d', creds.name) + kf = kf.set_secret('d', 'key1', 'val1') + + # Can't add duplicate domain + with pytest.raises(file_keys.PPError): + kf.add_domain('d', creds.name) + + # Can't add duplicate custodian + with pytest.raises(file_keys.PPError): + kf.add_key_custodian(creds) + + # Can't add secret that exists + with pytest.raises(file_keys.PPError): + kf.add_secret('d', 'key1', 'other') + + # Can't access nonexistent domain + with pytest.raises(file_keys.PPKeyError): + kf.decrypt_domain('nonexistent', creds) + + # Invalid secret name + with pytest.raises(ValueError): + kf.set_secret('d', '$invalid', 'val') diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..39329b7 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,41 @@ +[build-system] +requires = ["setuptools>=68.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "pocket-protector" +version = "24.0.0dev" +description = "Handy secret management system with a convenient CLI and readable storage format." +readme = "README.md" +license = "Apache-2.0" +requires-python = ">=3.9" +authors = [ + {name = "Kurt Rose", email = "kurt@kurtrose.com"}, + {name = "Mahmoud Hashemi"}, +] +classifiers = [ + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", +] +dependencies = [ + "attrs", + "boltons", + "PyNaCl", + "ruamel.yaml", + "schema", + "face>=24.0.0", +] + +[project.scripts] +pprotect = "pocket_protector.__main__:main" +pocket_protector = "pocket_protector.__main__:main" + +[project.urls] +Homepage = "https://github.com/SimpleLegal/pocket_protector" + +[tool.pytest.ini_options] +testpaths = ["pocket_protector/tests"] diff --git a/requirements.in b/requirements.in index b05e78a..a0f8ecd 100644 --- a/requirements.in +++ b/requirements.in @@ -1,12 +1,7 @@ attrs boltons -coverage -face>=20.1.1 -pip-tools -PyNaCl +face>=24.0.0 pytest -# ruamel.ordereddict==0.4.14 # via ruamel.yaml # this stays commented out so py3 builds work +PyNaCl ruamel.yaml schema -tox -twine diff --git a/setup.py b/setup.py deleted file mode 100644 index 65272aa..0000000 --- a/setup.py +++ /dev/null @@ -1,62 +0,0 @@ - -import os -import imp -from setuptools import setup, find_packages - -__author__ = "Kurt Rose and Mahmoud Hashemi" -__contact__ = "kurt@kurtrose.com" -__license__ = 'Apache License 2.0' -__url__ = 'https://github.com/SimpleLegal/pocket_protector' - -CUR_PATH = os.path.abspath(os.path.dirname(__file__)) -_version_mod_path = os.path.join(CUR_PATH, 'pocket_protector', '_version.py') -_version_mod = imp.load_source('_version', _version_mod_path) -__version__ = _version_mod.__version__ - -with open('README.md') as readme: - long_description = readme.read() - - -setup( - name="pocket-protector", - description="Handy secret management system with a convenient CLI and readable storage format.", - long_description=long_description, - long_description_content_type='text/markdown', - author=__author__, - author_email=__contact__, - url=__url__, - license=__license__, - platforms='any', - version=__version__, - packages=find_packages(), - include_package_data=True, - zip_safe=False, - entry_points={'console_scripts': ['pprotect = pocket_protector.__main__:main', - 'pocket_protector = pocket_protector.__main__:main']}, - install_requires=['attrs', - 'boltons', - 'PyNaCl', - 'ruamel.yaml', - 'schema', - 'face>=20.1.1'] -) - -""" -Release process: - -* tox -* git commit (if applicable) -* Remove dev suffix from pocket_protector/_version.py version -* git commit -a -m "bump version for vX.Y.Z release" -* rm -rf dist -* python setup.py sdist bdist_wheel -* twine upload dist/* -* git tag -a vX.Y.Z -m "brief summary" -* write CHANGELOG -* git commit -* bump pocket_protector/_version.py version onto n+1 dev -* git commit -* git push - -Versions are of the format YY.MINOR.MICRO, see calver.org for more details. -""" diff --git a/tox.ini b/tox.ini index 8375cb4..c0d555a 100644 --- a/tox.ini +++ b/tox.ini @@ -1,27 +1,7 @@ [tox] -envlist = py27,py36,py37,pypy,coverage-report,packaging +envlist = py39,py310,py311,py312,py313 [testenv] -changedir = .tox -deps = -rrequirements.txt -commands = coverage run --parallel --rcfile {toxinidir}/.tox-coveragerc -m pytest --doctest-modules {envsitepackagesdir}/pocket_protector {posargs} - -# Uses default basepython otherwise reporting doesn't work on Travis where -# Python 3.6 is only available in 3.6 jobs. -[testenv:coverage-report] -changedir = .tox -deps = coverage -commands = coverage combine --rcfile {toxinidir}/.tox-coveragerc - coverage report --rcfile {toxinidir}/.tox-coveragerc - coverage html --rcfile {toxinidir}/.tox-coveragerc -d {toxinidir}/htmlcov - - -[testenv:packaging] changedir = {toxinidir} -deps = - twine - check-manifest -commands = - python setup.py sdist bdist_wheel - twine check dist/* - check-manifest +deps = pytest +commands = pytest {posargs} From 8d181f6f0bc6dc0507421a80240967cb7e9bf6ba Mon Sep 17 00:00:00 2001 From: Mahmoud Hashemi Date: Sat, 14 Feb 2026 00:46:03 +0000 Subject: [PATCH 2/5] add migrate-owner, list-user-secrets, fast-crypto passphrase support - KeyFile.migrate_owner: batch add owner across all domains - KeyFile.get_custodian_domains: list domains a custodian owns - set_key_custodian_passphrase: accept opslimit/memlimit params - CLI: migrate-owner subcommand with confirmation prompt - CLI: list-user-secrets subcommand (read-only) - CLI: set-key-custodian-passphrase now supports --fast-crypto flag - Tests for all new functionality --- pocket_protector/cli.py | 35 +++++++++++- pocket_protector/file_keys.py | 27 +++++++-- pocket_protector/tests/test_cli.py | 57 +++++++++++++++++++ pocket_protector/tests/test_file_keys.py | 72 ++++++++++++++++++++++++ 4 files changed, 186 insertions(+), 5 deletions(-) diff --git a/pocket_protector/cli.py b/pocket_protector/cli.py index 6bcff9e..df07f47 100644 --- a/pocket_protector/cli.py +++ b/pocket_protector/cli.py @@ -174,6 +174,9 @@ def _get_cmd(prepare=False): cmd.add(set_key_custodian_passphrase) cmd.add(rotate_domain_keys) + cmd.add(migrate_owner) + cmd.add(list_user_secrets) + cmd.add(decrypt_domain, posargs={'count': 1, 'provides': 'domain_name'}) cmd.add(list_domains) @@ -268,13 +271,16 @@ def rm_secret(wkf): return wkf.rm_secret(domain_name, secret_name) -def set_key_custodian_passphrase(wkf): +def set_key_custodian_passphrase(wkf, fast_crypto=None): 'update a key custodian passphrase' user_id = prompt('User email: ') passphrase = prompt.secret('Current passphrase: ') creds = Creds(user_id, passphrase) _check_creds(wkf, creds) new_passphrase = prompt.secret('New passphrase: ', confirm=True) + if fast_crypto: + from .file_keys import KDF_INTERACTIVE + return wkf.set_key_custodian_passphrase(creds, new_passphrase, opslimit=KDF_INTERACTIVE[0], memlimit=KDF_INTERACTIVE[1]) return wkf.set_key_custodian_passphrase(creds, new_passphrase) @@ -284,6 +290,21 @@ def rotate_domain_keys(wkf, creds): return wkf.rotate_domain_key(domain_name, creds) +def migrate_owner(wkf, creds): + 'grant a custodian ownership of all domains you own' + new_owner = prompt('New owner email: ') + owned = wkf.get_custodian_domains(creds.name) + if not owned: + echo('You do not own any domains.') + return None + echo('Will add %s as owner to: %s' % (new_owner, ', '.join(sorted(owned)))) + confirm = prompt('Proceed? [y/N] ') + if not confirm.lower().startswith('y'): + echo('Aborting.') + return None + return wkf.migrate_owner(new_owner, creds) + + """ Read-only operations follow """ @@ -341,6 +362,18 @@ def list_audit_log(kf): return +def list_user_secrets(kf, creds): + 'display domains and secrets accessible to the authenticated user' + owned = kf.get_custodian_domains(creds.name) + if not owned: + echo.err('User %s does not own any domains.' % creds.name) + return + for domain_name in sorted(owned): + secrets = kf.get_domain_secret_names(domain_name) + echo('%s: %s' % (domain_name, ', '.join(secrets) if secrets else '(no secrets)')) + return + + """ End subcommand handlers diff --git a/pocket_protector/file_keys.py b/pocket_protector/file_keys.py index 21d4b7f..e02bee9 100644 --- a/pocket_protector/file_keys.py +++ b/pocket_protector/file_keys.py @@ -579,8 +579,8 @@ def decrypt_domain(self, domain_name, creds): return self._domains[domain_name].get_decrypted( self._key_custodians[creds.name], creds) - def set_key_custodian_passphrase(self, creds, new_passphrase): - new_kc = _KeyCustodian.from_creds(Creds(creds.name, new_passphrase)) + def set_key_custodian_passphrase(self, creds, new_passphrase, opslimit=None, memlimit=None): + new_kc = _KeyCustodian.from_creds(Creds(creds.name, new_passphrase), opslimit=opslimit, memlimit=memlimit) cur_kc = self._key_custodians[creds.name] key_custodians = dict(self._key_custodians) key_custodians[creds.name] = new_kc @@ -595,8 +595,9 @@ def set_key_custodian_passphrase(self, creds, new_passphrase): return attr.evolve( self, key_custodians=key_custodians, domains=domains, log=self._new_log( - 'updated key custodian passphrase for {} (updated domains -> {})', - creds.name, ", ".join(updated))) + 'updated key custodian passphrase for {} (updated domains -> {}){}', + creds.name, ", ".join(updated), + ' (custom KDF params)' if (opslimit or memlimit) else '')) def check_creds(self, creds): try: @@ -609,6 +610,24 @@ def check_creds(self, creds): return False return True + def get_custodian_domains(self, key_custodian_name): + '''Return list of domain names where key_custodian_name is an owner.''' + return [name for name, domain in self._domains.items() + if key_custodian_name in domain.get_owner_names()] + + def migrate_owner(self, new_custodian_name, creds, domain_names=None): + '''Add new_custodian_name as owner to all (or specified) domains owned by creds user.''' + if new_custodian_name not in self._key_custodians: + raise PPError('no key custodian named %s' % new_custodian_name) + if domain_names is None: + domain_names = self.get_custodian_domains(creds.name) + if not domain_names: + raise PPError('%s does not own any domains' % creds.name) + result = self + for domain_name in domain_names: + result = result.add_owner(domain_name, new_custodian_name, creds) + return result + def rotate_domain_key(self, domain_name, creds): ''' rotate the keypair used to secure a domain diff --git a/pocket_protector/tests/test_cli.py b/pocket_protector/tests/test_cli.py index 07aac93..7b04c0e 100644 --- a/pocket_protector/tests/test_cli.py +++ b/pocket_protector/tests/test_cli.py @@ -175,3 +175,60 @@ def test_cli_fast_crypto_flag(tmp_path, _fast_crypto): cc2.run(['pprotect', 'add-secret'], input=['dev', 'key1', 'val1']) res = cc2.run(['pprotect', 'decrypt-domain', 'dev']) assert json.loads(res.stdout)['key1'] == 'val1' + + +def test_cli_migrate_owner(tmp_path, _fast_crypto): + """Test the migrate-owner CLI subcommand.""" + cmd = cli._get_cmd() + cc = CommandChecker(cmd, reraise=True) + protected_path = str(tmp_path) + '/protected.yaml' + + # Init with kurt + cc.run('pprotect init --file %s' % protected_path, + input=[KURT_EMAIL, KURT_PHRASE, KURT_PHRASE]) + + kurt_env = {'PPROTECT_USER': KURT_EMAIL, 'PPROTECT_PASSPHRASE': KURT_PHRASE} + cc = CommandChecker(cmd, chdir=str(tmp_path), env=kurt_env, reraise=True) + + # Add MH as custodian + cc.run('pprotect add-key-custodian', input=[MH_EMAIL, MH_PHRASE, MH_PHRASE]) + + # Add two domains owned by kurt, with secrets + cc.run(['pprotect', 'add-domain'], input=['dom1']) + cc.run(['pprotect', 'add-domain'], input=['dom2']) + cc.run(['pprotect', 'add-secret'], input=['dom1', 'key1', 'val1']) + cc.run(['pprotect', 'add-secret'], input=['dom2', 'key2', 'val2']) + + # Migrate ownership to MH + cc.run('pprotect migrate-owner', input=[MH_EMAIL, 'y']) + + # MH should now be able to decrypt both domains + mh_env = {'PPROTECT_USER': MH_EMAIL, 'PPROTECT_PASSPHRASE': MH_PHRASE} + cc_mh = CommandChecker(cmd, chdir=str(tmp_path), env=mh_env, reraise=True) + res = cc_mh.run(['pprotect', 'decrypt-domain', 'dom1']) + assert json.loads(res.stdout)['key1'] == 'val1' + res = cc_mh.run(['pprotect', 'decrypt-domain', 'dom2']) + assert json.loads(res.stdout)['key2'] == 'val2' + + +def test_cli_list_user_secrets(tmp_path, _fast_crypto): + """Test the list-user-secrets CLI subcommand.""" + cmd = cli._get_cmd() + cc = CommandChecker(cmd, reraise=True) + protected_path = str(tmp_path) + '/protected.yaml' + + # Init with kurt + cc.run('pprotect init --file %s' % protected_path, + input=[KURT_EMAIL, KURT_PHRASE, KURT_PHRASE]) + + kurt_env = {'PPROTECT_USER': KURT_EMAIL, 'PPROTECT_PASSPHRASE': KURT_PHRASE} + cc = CommandChecker(cmd, chdir=str(tmp_path), env=kurt_env, reraise=True) + + # Add domain and secret + cc.run(['pprotect', 'add-domain'], input=[DOMAIN_NAME]) + cc.run(['pprotect', 'add-secret'], input=[DOMAIN_NAME, SECRET_NAME, SECRET_VALUE]) + + # List user secrets + res = cc.run('pprotect list-user-secrets') + assert DOMAIN_NAME in res.stdout + assert SECRET_NAME in res.stdout diff --git a/pocket_protector/tests/test_file_keys.py b/pocket_protector/tests/test_file_keys.py index 2c601eb..679e952 100644 --- a/pocket_protector/tests/test_file_keys.py +++ b/pocket_protector/tests/test_file_keys.py @@ -175,3 +175,75 @@ def test_error_cases(_fast_crypto): # Invalid secret name with pytest.raises(ValueError): kf.set_secret('d', '$invalid', 'val') + + +def test_set_passphrase_with_kdf_params(_fast_crypto): + """Test set_key_custodian_passphrase with explicit KDF params.""" + from pocket_protector.file_keys import KDF_INTERACTIVE + bob = file_keys.Creds('bob@example.com', 'secret1') + tmp = tempfile.NamedTemporaryFile() + kf = file_keys.KeyFile.create(path=tmp.name) + kf = kf.add_key_custodian(bob) # v0 custodian + kf = kf.add_domain('prod', bob.name) + kf = kf.set_secret('prod', 'db-pass', 'hunter2') + # Change passphrase with explicit KDF params + new_pass = 'new-secret1' + kf = kf.set_key_custodian_passphrase( + bob, new_pass, + opslimit=KDF_INTERACTIVE[0], memlimit=KDF_INTERACTIVE[1]) + # Verify decrypt works with new passphrase + new_bob = file_keys.Creds('bob@example.com', new_pass) + assert kf.decrypt_domain('prod', new_bob)['db-pass'] == 'hunter2' + # Round-trip through file + kf.write() + kf2 = file_keys.KeyFile.from_file(tmp.name) + assert kf2 == kf + assert kf2.decrypt_domain('prod', new_bob)['db-pass'] == 'hunter2' + + +def test_get_custodian_domains(_fast_crypto): + """Test get_custodian_domains returns correct domain lists.""" + alice = file_keys.Creds('alice@example.com', 'alice-pass') + bob = file_keys.Creds('bob@example.com', 'bob-pass') + tmp = tempfile.NamedTemporaryFile() + kf = file_keys.KeyFile.create(path=tmp.name) + kf = kf.add_key_custodian(alice) + kf = kf.add_key_custodian(bob) + # Alice owns both domains + kf = kf.add_domain('domain1', alice.name) + kf = kf.add_domain('domain2', alice.name) + # Bob owns only domain1 + kf = kf.add_owner('domain1', bob.name, alice) + alice_domains = kf.get_custodian_domains(alice.name) + bob_domains = kf.get_custodian_domains(bob.name) + assert sorted(alice_domains) == ['domain1', 'domain2'] + assert sorted(bob_domains) == ['domain1'] + + +def test_migrate_owner(_fast_crypto): + """Test migrate_owner transfers ownership across all domains.""" + alice = file_keys.Creds('alice@example.com', 'alice-pass') + bob = file_keys.Creds('bob@example.com', 'bob-pass') + tmp = tempfile.NamedTemporaryFile() + kf = file_keys.KeyFile.create(path=tmp.name) + kf = kf.add_key_custodian(alice) + kf = kf.add_key_custodian(bob) + # Alice owns three domains with secrets + for d in ('d1', 'd2', 'd3'): + kf = kf.add_domain(d, alice.name) + kf = kf.set_secret(d, 'key', 'val-' + d) + # Migrate ownership to bob + kf = kf.migrate_owner(bob.name, alice) + # Bob should now own all three + assert sorted(kf.get_custodian_domains(bob.name)) == ['d1', 'd2', 'd3'] + # Bob can decrypt all three + for d in ('d1', 'd2', 'd3'): + assert kf.decrypt_domain(d, bob)['key'] == 'val-' + d + # Error: nonexistent new custodian + with pytest.raises(Exception): + kf.migrate_owner('nobody@example.com', alice) + # Error: user with no domains + carol = file_keys.Creds('carol@example.com', 'carol-pass') + kf2 = kf.add_key_custodian(carol) + with pytest.raises(Exception): + kf2.migrate_owner(bob.name, carol) From 13a0386855f93386bced9bc461def4d576825b60 Mon Sep 17 00:00:00 2001 From: Mahmoud Hashemi Date: Sat, 14 Feb 2026 20:14:15 +0000 Subject: [PATCH 3/5] add v2 raw-key custodians: P<64hex>P passphrase, no KDF Raw-key custodians bypass argon2 entirely. The passphrase IS the entropy (256 bits), so password stretching adds only latency. Format: P<64 lowercase hex chars>P (e.g. P0a1b2c...P) - v2 pwdkm format: version(1=0x02) + salt(8) + pubkey(32) - _kdf_raw: sha512(passphrase + salt + name)[:32], no argon2 - generate_raw_passphrase(): creates P<64hex>P from os.urandom(32) - is_raw_passphrase(): validates format - KeyFile.add_raw_key_custodian(): creates v2 custodian - CLI: add-raw-key-custodian subcommand - Generates key, displays once with stern warning - Requires typing YES to confirm key was saved - Aborts cleanly on decline - v0/v1/v2 custodians coexist in same protected.yaml - 22 tests (6 new) --- pocket_protector/cli.py | 24 +++++++ pocket_protector/file_keys.py | 71 ++++++++++++++++++-- pocket_protector/tests/test_cli.py | 59 +++++++++++++++++ pocket_protector/tests/test_file_keys.py | 83 ++++++++++++++++++++++++ 4 files changed, 231 insertions(+), 6 deletions(-) diff --git a/pocket_protector/cli.py b/pocket_protector/cli.py index df07f47..e5f97f8 100644 --- a/pocket_protector/cli.py +++ b/pocket_protector/cli.py @@ -160,6 +160,7 @@ def _get_cmd(prepare=False): # add subcommands cmd.add(add_key_custodian, name='init', doc='create a new protected') cmd.add(add_key_custodian) + cmd.add(add_raw_key_custodian) cmd.add(add_domain) cmd.add(rm_domain) @@ -214,6 +215,29 @@ def add_key_custodian(wkf, fast_crypto=None): return wkf.add_key_custodian(creds) +def add_raw_key_custodian(wkf): + 'add a key custodian with a generated raw key (no KDF, instant auth)' + from .file_keys import generate_raw_passphrase + user_id = prompt('User email: ') + passphrase = generate_raw_passphrase() + echo('') + echo('=' * 72) + echo(' GENERATED RAW KEY (copy this now, it will not be shown again):') + echo('') + echo(' ' + passphrase) + echo('') + echo(' Store this key securely. It is your passphrase.') + echo(' LOSS OF THIS KEY MEANS LOSS OF CUSTODIAN ACCESS.') + echo('=' * 72) + echo('') + confirm = prompt('Have you saved the key? Type YES to confirm: ') + if confirm.strip() != 'YES': + echo('Aborting. Key was not confirmed.') + return None + creds = Creds(user_id, passphrase) + return wkf.add_raw_key_custodian(creds) + + def add_domain(wkf, creds): 'add a new domain to the protected' echo('Adding new domain.') diff --git a/pocket_protector/file_keys.py b/pocket_protector/file_keys.py index e02bee9..d91b5fb 100644 --- a/pocket_protector/file_keys.py +++ b/pocket_protector/file_keys.py @@ -68,6 +68,11 @@ def _as_l(sub_schema): return _coerce(list, sub_schema) KDF_SENSITIVE = (nacl.pwhash.argon2id.OPSLIMIT_SENSITIVE, nacl.pwhash.argon2id.MEMLIMIT_MODERATE) # ~0.8s, 256MB - production default KDF_INTERACTIVE = (nacl.pwhash.argon2id.OPSLIMIT_INTERACTIVE, nacl.pwhash.argon2id.MEMLIMIT_INTERACTIVE) # ~0.1s, 64MB - dev/testing +# Raw key passphrase format: P<64 hex chars>P +# When passphrase has sufficient entropy, KDF can be bypassed entirely. +_RAW_KEY_RE = re.compile(r'^P([0-9a-f]{64})P$') +KDF_RAW_KEY = 'raw' # sentinel for raw-key custodians + # Keep module-level defaults for backward compat (v0 custodians use these) OPSLIMIT = KDF_SENSITIVE[0] MEMLIMIT = KDF_SENSITIVE[1] @@ -96,6 +101,24 @@ def _kdf(creds, salt, opslimit=None, memlimit=None): memlimit=memlimit if memlimit is not None else MEMLIMIT) +def _kdf_raw(creds, salt): + '''Derive key directly from high-entropy passphrase (P<64hex>P format). + No argon2 — the passphrase IS the entropy.''' + name = creds.name.encode('utf8') + passphrase = creds.passphrase.encode('utf8') + return hashlib.sha512(passphrase + salt + name).digest()[:nacl.public.PrivateKey.SIZE] + + +def generate_raw_passphrase(): + '''Generate a P<64hex>P raw-key passphrase with 256 bits of entropy.''' + return 'P' + os.urandom(32).hex() + 'P' + + +def is_raw_passphrase(passphrase): + '''Check if passphrase matches the P<64hex>P raw-key format.''' + return bool(_RAW_KEY_RE.match(passphrase)) + + def _decode(b64): ''' assert everything is version 0 @@ -128,6 +151,7 @@ class _KeyCustodian(object): _salt = attr.ib() _opslimit = attr.ib(default=None) _memlimit = attr.ib(default=None) + _raw_key = attr.ib(default=False) def encrypt_for(self, bytes): 'encrypt the passed bytes so that this key-custodian can decrypt' @@ -136,10 +160,14 @@ def encrypt_for(self, bytes): def decrypt_as(self, creds, bytes): 'decrypt the passed bytes that were encrypted for this key-custodian' assert creds.name == self.name + if self._raw_key: + key_bytes = _kdf_raw(creds, self._salt) + else: + key_bytes = _kdf(creds, self._salt, + opslimit=self._opslimit, + memlimit=self._memlimit) return nacl.public.SealedBox( - nacl.public.PrivateKey(_kdf(creds, self._salt, - opslimit=self._opslimit, - memlimit=self._memlimit))).decrypt(bytes) + nacl.public.PrivateKey(key_bytes)).decrypt(bytes) @classmethod def from_creds(cls, creds, opslimit=None, memlimit=None): @@ -150,7 +178,17 @@ def from_creds(cls, creds, opslimit=None, memlimit=None): memlimit=memlimit)) return cls( name=creds.name, public_key=private_key.public_key, salt=salt, - opslimit=opslimit, memlimit=memlimit) + opslimit=opslimit, memlimit=memlimit, raw_key=False) + @classmethod + def from_raw_creds(cls, creds): + 'create a new user from a raw-key (P<64hex>P) passphrase' + if not is_raw_passphrase(creds.passphrase): + raise PPError('raw-key passphrase must match P<64 hex chars>P format') + salt = os.urandom(8) + private_key = nacl.public.PrivateKey(_kdf_raw(creds, salt)) + return cls( + name=creds.name, public_key=private_key.public_key, salt=salt, + opslimit=None, memlimit=None, raw_key=True) @classmethod def from_data(cls, name, data): @@ -171,12 +209,22 @@ def from_data(cls, name, data): salt, public_key = payload[:8], payload[8:8 + nacl.public.PublicKey.SIZE] return cls( name=name, public_key=nacl.public.PublicKey(public_key), - salt=salt, opslimit=opslimit, memlimit=memlimit) + salt=salt, opslimit=opslimit, memlimit=memlimit, raw_key=False) + elif version == 2: + # v2: version(1) + salt(8) + pubkey(32) - raw key, no argon2 + payload = raw[1:] + salt, public_key = payload[:8], payload[8:8 + nacl.public.PublicKey.SIZE] + return cls( + name=name, public_key=nacl.public.PublicKey(public_key), + salt=salt, opslimit=None, memlimit=None, raw_key=True) else: raise PPError('unsupported pwdkm version %s' % version) def as_data(self): - if self._opslimit is not None and self._memlimit is not None: + if self._raw_key: + # v2 format + raw = b'\x02' + self._salt + self._public_key.encode() + elif self._opslimit is not None and self._memlimit is not None: # v1 format raw = (b'\x01' + struct.pack('P) passphrase, no KDF' + key_custodians = dict(self._key_custodians) + if creds.name in key_custodians: + raise PPError( + 'tried to add key custodian that already exists: {}'.format(creds.name)) + key_custodians[creds.name] = _KeyCustodian.from_raw_creds(creds) + return attr.evolve( + self, key_custodians=key_custodians, + log=self._new_log('created raw-key custodian {}', creds.name)) + def rm_key_custodian(self, key_custodian_name): 'remove key custodian and all domain ownerships' key_custodians = dict(self._key_custodians) diff --git a/pocket_protector/tests/test_cli.py b/pocket_protector/tests/test_cli.py index 7b04c0e..cd588d7 100644 --- a/pocket_protector/tests/test_cli.py +++ b/pocket_protector/tests/test_cli.py @@ -232,3 +232,62 @@ def test_cli_list_user_secrets(tmp_path, _fast_crypto): res = cc.run('pprotect list-user-secrets') assert DOMAIN_NAME in res.stdout assert SECRET_NAME in res.stdout + + +def test_cli_add_raw_key_custodian(tmp_path, _fast_crypto): + """Test the add-raw-key-custodian CLI subcommand.""" + cmd = cli._get_cmd() + cc = CommandChecker(cmd, reraise=True) + protected_path = str(tmp_path) + '/protected.yaml' + + # Init with kurt (normal custodian) + cc.run('pprotect init --file %s' % protected_path, + input=[KURT_EMAIL, KURT_PHRASE, KURT_PHRASE]) + + kurt_env = {'PPROTECT_USER': KURT_EMAIL, 'PPROTECT_PASSPHRASE': KURT_PHRASE} + cc = CommandChecker(cmd, chdir=str(tmp_path), env=kurt_env, reraise=True) + + # Add a domain and secret first + cc.run(['pprotect', 'add-domain'], input=[DOMAIN_NAME]) + cc.run(['pprotect', 'add-secret'], input=[DOMAIN_NAME, SECRET_NAME, SECRET_VALUE]) + + # Add raw-key custodian ("YES" confirms key was saved) + res = cc.run('pprotect add-raw-key-custodian', input=['raw@example.com', 'YES']) + # Output should contain the generated key + assert 'GENERATED RAW KEY' in res.stdout + # Extract the key from output + import re + match = re.search(r'(P[0-9a-f]{64}P)', res.stdout) + assert match, 'Expected raw key in output' + raw_passphrase = match.group(1) + + # Add raw custodian as owner + cc.run(['pprotect', 'add-owner'], input=[DOMAIN_NAME, 'raw@example.com']) + + # Raw custodian should be able to decrypt + raw_env = {'PPROTECT_USER': 'raw@example.com', 'PPROTECT_PASSPHRASE': raw_passphrase} + cc_raw = CommandChecker(cmd, chdir=str(tmp_path), env=raw_env, reraise=True) + res = cc_raw.run(['pprotect', 'decrypt-domain', DOMAIN_NAME]) + assert json.loads(res.stdout)[SECRET_NAME] == SECRET_VALUE + + +def test_cli_add_raw_key_custodian_abort(tmp_path, _fast_crypto): + """Test that declining confirmation aborts without creating custodian.""" + cmd = cli._get_cmd() + cc = CommandChecker(cmd, reraise=True) + protected_path = str(tmp_path) + '/protected.yaml' + + cc.run('pprotect init --file %s' % protected_path, + input=[KURT_EMAIL, KURT_PHRASE, KURT_PHRASE]) + + kurt_env = {'PPROTECT_USER': KURT_EMAIL, 'PPROTECT_PASSPHRASE': KURT_PHRASE} + cc = CommandChecker(cmd, chdir=str(tmp_path), env=kurt_env, reraise=True) + + # Type "no" instead of "YES" - should abort + res = cc.run('pprotect add-raw-key-custodian', input=['raw@example.com', 'no']) + assert 'Aborting' in res.stdout + + # Custodian should not exist - add-owner should fail + import ruamel.yaml + data = ruamel.yaml.YAML().load(open(str(tmp_path) + '/protected.yaml').read()) + assert 'raw@example.com' not in data['key-custodians'] diff --git a/pocket_protector/tests/test_file_keys.py b/pocket_protector/tests/test_file_keys.py index 679e952..738b01d 100644 --- a/pocket_protector/tests/test_file_keys.py +++ b/pocket_protector/tests/test_file_keys.py @@ -247,3 +247,86 @@ def test_migrate_owner(_fast_crypto): kf2 = kf.add_key_custodian(carol) with pytest.raises(Exception): kf2.migrate_owner(bob.name, carol) + + +def test_raw_key_custodian(_fast_crypto): + """Test raw-key (v2) custodian creation, encrypt/decrypt, and round-trip.""" + from pocket_protector.file_keys import generate_raw_passphrase, is_raw_passphrase, _KeyCustodian + passphrase = generate_raw_passphrase() + assert is_raw_passphrase(passphrase) + assert not is_raw_passphrase('notarawkey') + assert not is_raw_passphrase('P1234P') # too short + + creds = file_keys.Creds('rawuser@example.com', passphrase) + kc = _KeyCustodian.from_raw_creds(creds) + # Round-trip through data + data = kc.as_data() + kc2 = _KeyCustodian.from_data(creds.name, data) + assert kc2._raw_key is True + # Encrypt/decrypt + enc = kc.encrypt_for(b'secret-data') + dec = kc2.decrypt_as(creds, enc) + assert dec == b'secret-data' + + +def test_raw_key_in_keyfile(_fast_crypto): + """Test raw-key custodian works end-to-end in a KeyFile.""" + from pocket_protector.file_keys import generate_raw_passphrase + passphrase = generate_raw_passphrase() + raw_creds = file_keys.Creds('raw@example.com', passphrase) + bob = file_keys.Creds('bob@example.com', 'bob-pass') + + tmp = tempfile.NamedTemporaryFile() + kf = file_keys.KeyFile.create(path=tmp.name) + kf = kf.add_key_custodian(bob) # v0 + kf = kf.add_raw_key_custodian(raw_creds) # v2 + kf = kf.add_domain('dev', bob.name) + kf = kf.add_owner('dev', raw_creds.name, bob) + kf = kf.set_secret('dev', 'db_pass', 'hunter2') + + # Both can decrypt + assert kf.decrypt_domain('dev', bob)['db_pass'] == 'hunter2' + assert kf.decrypt_domain('dev', raw_creds)['db_pass'] == 'hunter2' + + # Round-trip through file + kf.write() + kf2 = file_keys.KeyFile.from_file(tmp.name) + assert kf2 == kf + assert kf2.decrypt_domain('dev', raw_creds)['db_pass'] == 'hunter2' + assert kf2.check_creds(raw_creds) + assert kf2.check_creds(bob) + + +def test_v0_v1_v2_coexistence(_fast_crypto): + """Test all three custodian versions coexist in one KeyFile.""" + from pocket_protector.file_keys import KDF_INTERACTIVE, generate_raw_passphrase + v0_creds = file_keys.Creds('v0@example.com', 'pass0') + v1_creds = file_keys.Creds('v1@example.com', 'pass1') + raw_pass = generate_raw_passphrase() + v2_creds = file_keys.Creds('v2@example.com', raw_pass) + + tmp = tempfile.NamedTemporaryFile() + kf = file_keys.KeyFile.create(path=tmp.name) + kf = kf.add_key_custodian(v0_creds) + kf = kf.add_key_custodian(v1_creds, opslimit=KDF_INTERACTIVE[0], memlimit=KDF_INTERACTIVE[1]) + kf = kf.add_raw_key_custodian(v2_creds) + + kf = kf.add_domain('shared', v0_creds.name) + kf = kf.add_owner('shared', v1_creds.name, v0_creds) + kf = kf.add_owner('shared', v2_creds.name, v0_creds) + kf = kf.set_secret('shared', 'token', 'abc123') + + kf.write() + kf2 = file_keys.KeyFile.from_file(tmp.name) + assert kf2 == kf + for c in (v0_creds, v1_creds, v2_creds): + assert kf2.decrypt_domain('shared', c)['token'] == 'abc123' + assert kf2.check_creds(c) + + +def test_raw_key_invalid_passphrase(_fast_crypto): + """Test that from_raw_creds rejects non-raw passphrases.""" + from pocket_protector.file_keys import _KeyCustodian + bad_creds = file_keys.Creds('user@example.com', 'regular-password') + with pytest.raises(file_keys.PPError): + _KeyCustodian.from_raw_creds(bad_creds) From fbc621971affa2b58fd90c492a2ed49577c80c50 Mon Sep 17 00:00:00 2001 From: Mahmoud Hashemi Date: Sat, 14 Feb 2026 20:21:32 +0000 Subject: [PATCH 4/5] replace --fast-crypto with --key-type hard|fast|raw, add rekey-custodian Breaking change: --fast-crypto flag removed in favor of --key-type. pprotect add-key-custodian --key-type fast # was --fast-crypto pprotect add-key-custodian --key-type raw # was add-raw-key-custodian pprotect add-key-custodian # default: hard (unchanged) New: rekey-custodian command Changes an existing custodian's key type and passphrase in one step. Re-encrypts all domain ownership keys automatically. pprotect rekey-custodian --key-type fast # move existing custodian to fast KDF pprotect rekey-custodian --key-type raw # move to raw key (generates PP) Internal: KeyFile._replace_key_custodian extracted from set_key_custodian_passphrase for reuse by rekey_custodian. 23 tests pass. --- pocket_protector/cli.py | 82 +++++++++++++++++++++--------- pocket_protector/file_keys.py | 20 ++++++-- pocket_protector/tests/test_cli.py | 55 ++++++++++++++------ 3 files changed, 115 insertions(+), 42 deletions(-) diff --git a/pocket_protector/cli.py b/pocket_protector/cli.py index e5f97f8..d25c438 100644 --- a/pocket_protector/cli.py +++ b/pocket_protector/cli.py @@ -148,8 +148,8 @@ def _get_cmd(prepare=False): doc="the acting user's email credential") cmd.add('--passphrase-file', doc='path to a file containing only the passphrase, likely provided by a deployment system') - cmd.add('--fast-crypto', parse_as=True, - doc='use faster (less secure) KDF parameters, suitable for development') + cmd.add('--key-type', + doc='custodian key type: hard (default, slow KDF), fast (quick KDF), or raw (no KDF, generated key)') # add middlewares, outermost first ("first added, first called") cmd.add(mw_verify_creds) @@ -160,7 +160,7 @@ def _get_cmd(prepare=False): # add subcommands cmd.add(add_key_custodian, name='init', doc='create a new protected') cmd.add(add_key_custodian) - cmd.add(add_raw_key_custodian) + cmd.add(rekey_custodian) cmd.add(add_domain) cmd.add(rm_domain) @@ -205,21 +205,17 @@ def main(argv=None): # pragma: no cover (see note in tests.test_cli.test_main) The following subcommand handlers all update/write to a protected file (wkf). """ -def add_key_custodian(wkf, fast_crypto=None): - 'add a new key custodian to the protected' - echo('Adding new key custodian.') - creds = _get_new_creds() - if fast_crypto: - from .file_keys import KDF_INTERACTIVE - return wkf.add_key_custodian(creds, opslimit=KDF_INTERACTIVE[0], memlimit=KDF_INTERACTIVE[1]) - return wkf.add_key_custodian(creds) +_KEY_TYPES = ('hard', 'fast', 'raw') -def add_raw_key_custodian(wkf): - 'add a key custodian with a generated raw key (no KDF, instant auth)' - from .file_keys import generate_raw_passphrase - user_id = prompt('User email: ') - passphrase = generate_raw_passphrase() +def _validate_key_type(key_type): + if key_type and key_type not in _KEY_TYPES: + raise UsageError('--key-type must be one of: %s' % ', '.join(_KEY_TYPES)) + return key_type + + +def _show_raw_key_and_confirm(passphrase): + '''Display a generated raw key and require YES confirmation. Returns True if confirmed.''' echo('') echo('=' * 72) echo(' GENERATED RAW KEY (copy this now, it will not be shown again):') @@ -231,11 +227,26 @@ def add_raw_key_custodian(wkf): echo('=' * 72) echo('') confirm = prompt('Have you saved the key? Type YES to confirm: ') - if confirm.strip() != 'YES': - echo('Aborting. Key was not confirmed.') - return None - creds = Creds(user_id, passphrase) - return wkf.add_raw_key_custodian(creds) + return confirm.strip() == 'YES' + + +def add_key_custodian(wkf, key_type=None): + 'add a new key custodian to the protected' + from .file_keys import KDF_INTERACTIVE, generate_raw_passphrase + key_type = _validate_key_type(key_type) + echo('Adding new key custodian.') + if key_type == 'raw': + user_id = prompt('User email: ') + passphrase = generate_raw_passphrase() + if not _show_raw_key_and_confirm(passphrase): + echo('Aborting. Key was not confirmed.') + return None + creds = Creds(user_id, passphrase) + return wkf.add_raw_key_custodian(creds) + creds = _get_new_creds() + if key_type == 'fast': + return wkf.add_key_custodian(creds, opslimit=KDF_INTERACTIVE[0], memlimit=KDF_INTERACTIVE[1]) + return wkf.add_key_custodian(creds) def add_domain(wkf, creds): @@ -295,19 +306,42 @@ def rm_secret(wkf): return wkf.rm_secret(domain_name, secret_name) -def set_key_custodian_passphrase(wkf, fast_crypto=None): +def set_key_custodian_passphrase(wkf, key_type=None): 'update a key custodian passphrase' + from .file_keys import KDF_INTERACTIVE + key_type = _validate_key_type(key_type) user_id = prompt('User email: ') passphrase = prompt.secret('Current passphrase: ') creds = Creds(user_id, passphrase) _check_creds(wkf, creds) new_passphrase = prompt.secret('New passphrase: ', confirm=True) - if fast_crypto: - from .file_keys import KDF_INTERACTIVE + if key_type == 'fast': return wkf.set_key_custodian_passphrase(creds, new_passphrase, opslimit=KDF_INTERACTIVE[0], memlimit=KDF_INTERACTIVE[1]) return wkf.set_key_custodian_passphrase(creds, new_passphrase) +def rekey_custodian(wkf, key_type=None): + 'change a custodian key type (hard/fast/raw) and passphrase, re-encrypting all owned domains' + from .file_keys import KDF_INTERACTIVE, generate_raw_passphrase + key_type = _validate_key_type(key_type) or 'hard' + user_id = prompt('User email: ') + passphrase = prompt.secret('Current passphrase: ') + creds = Creds(user_id, passphrase) + _check_creds(wkf, creds) + if key_type == 'raw': + new_passphrase = generate_raw_passphrase() + if not _show_raw_key_and_confirm(new_passphrase): + echo('Aborting. Key was not confirmed.') + return None + new_creds = Creds(creds.name, new_passphrase) + return wkf.rekey_custodian(creds, new_creds, raw_key=True) + new_passphrase = prompt.secret('New passphrase: ', confirm=True) + new_creds = Creds(creds.name, new_passphrase) + if key_type == 'fast': + return wkf.rekey_custodian(creds, new_creds, opslimit=KDF_INTERACTIVE[0], memlimit=KDF_INTERACTIVE[1]) + return wkf.rekey_custodian(creds, new_creds) + + def rotate_domain_keys(wkf, creds): 'rotate the internal encryption keys for a given domain' domain_name = prompt('Domain name: ') diff --git a/pocket_protector/file_keys.py b/pocket_protector/file_keys.py index d91b5fb..775c0a5 100644 --- a/pocket_protector/file_keys.py +++ b/pocket_protector/file_keys.py @@ -640,6 +640,21 @@ def decrypt_domain(self, domain_name, creds): def set_key_custodian_passphrase(self, creds, new_passphrase, opslimit=None, memlimit=None): new_kc = _KeyCustodian.from_creds(Creds(creds.name, new_passphrase), opslimit=opslimit, memlimit=memlimit) + return self._replace_key_custodian(creds, new_kc, 'updated key custodian passphrase') + + def rekey_custodian(self, creds, new_creds, raw_key=False, opslimit=None, memlimit=None): + '''Replace a custodian's key material entirely (new passphrase, new KDF type). + The custodian name (email) must match between creds and new_creds.''' + if creds.name != new_creds.name: + raise PPError('rekey requires same custodian name, got %s and %s' % (creds.name, new_creds.name)) + if raw_key: + new_kc = _KeyCustodian.from_raw_creds(new_creds) + else: + new_kc = _KeyCustodian.from_creds(new_creds, opslimit=opslimit, memlimit=memlimit) + return self._replace_key_custodian(creds, new_kc, 'rekeyed custodian') + + def _replace_key_custodian(self, creds, new_kc, log_action): + '''Internal: replace a custodian's key material and re-encrypt domain ownership.''' cur_kc = self._key_custodians[creds.name] key_custodians = dict(self._key_custodians) key_custodians[creds.name] = new_kc @@ -654,9 +669,8 @@ def set_key_custodian_passphrase(self, creds, new_passphrase, opslimit=None, mem return attr.evolve( self, key_custodians=key_custodians, domains=domains, log=self._new_log( - 'updated key custodian passphrase for {} (updated domains -> {}){}', - creds.name, ", ".join(updated), - ' (custom KDF params)' if (opslimit or memlimit) else '')) + '{} for {} (updated domains -> {})', + log_action, creds.name, ', '.join(updated))) def check_creds(self, creds): try: diff --git a/pocket_protector/tests/test_cli.py b/pocket_protector/tests/test_cli.py index cd588d7..0ea9d7e 100644 --- a/pocket_protector/tests/test_cli.py +++ b/pocket_protector/tests/test_cli.py @@ -156,15 +156,15 @@ def test_main(tmp_path): assert res.decode('utf8').startswith('pocket_protector version') -def test_cli_fast_crypto_flag(tmp_path, _fast_crypto): - """Test the --fast-crypto flag for init.""" +def test_cli_key_type_fast(tmp_path, _fast_crypto): + """Test the --key-type fast flag for init.""" cmd = cli._get_cmd() cc = CommandChecker(cmd, reraise=True) protected_path = str(tmp_path) + '/protected.yaml' # Init with fast crypto - res = cc.run(f'pprotect init --file {protected_path} --fast-crypto', + res = cc.run(f'pprotect init --file {protected_path} --key-type fast', input=[KURT_EMAIL, KURT_PHRASE, KURT_PHRASE]) assert os.path.exists(protected_path) @@ -235,24 +235,21 @@ def test_cli_list_user_secrets(tmp_path, _fast_crypto): def test_cli_add_raw_key_custodian(tmp_path, _fast_crypto): - """Test the add-raw-key-custodian CLI subcommand.""" + """Test add-key-custodian --key-type raw.""" cmd = cli._get_cmd() cc = CommandChecker(cmd, reraise=True) protected_path = str(tmp_path) + '/protected.yaml' - # Init with kurt (normal custodian) cc.run('pprotect init --file %s' % protected_path, input=[KURT_EMAIL, KURT_PHRASE, KURT_PHRASE]) kurt_env = {'PPROTECT_USER': KURT_EMAIL, 'PPROTECT_PASSPHRASE': KURT_PHRASE} cc = CommandChecker(cmd, chdir=str(tmp_path), env=kurt_env, reraise=True) - # Add a domain and secret first cc.run(['pprotect', 'add-domain'], input=[DOMAIN_NAME]) cc.run(['pprotect', 'add-secret'], input=[DOMAIN_NAME, SECRET_NAME, SECRET_VALUE]) - # Add raw-key custodian ("YES" confirms key was saved) - res = cc.run('pprotect add-raw-key-custodian', input=['raw@example.com', 'YES']) + res = cc.run('pprotect add-key-custodian --key-type raw', input=['raw@example.com', 'YES']) # Output should contain the generated key assert 'GENERATED RAW KEY' in res.stdout # Extract the key from output @@ -260,11 +257,8 @@ def test_cli_add_raw_key_custodian(tmp_path, _fast_crypto): match = re.search(r'(P[0-9a-f]{64}P)', res.stdout) assert match, 'Expected raw key in output' raw_passphrase = match.group(1) - # Add raw custodian as owner cc.run(['pprotect', 'add-owner'], input=[DOMAIN_NAME, 'raw@example.com']) - - # Raw custodian should be able to decrypt raw_env = {'PPROTECT_USER': 'raw@example.com', 'PPROTECT_PASSPHRASE': raw_passphrase} cc_raw = CommandChecker(cmd, chdir=str(tmp_path), env=raw_env, reraise=True) res = cc_raw.run(['pprotect', 'decrypt-domain', DOMAIN_NAME]) @@ -282,12 +276,43 @@ def test_cli_add_raw_key_custodian_abort(tmp_path, _fast_crypto): kurt_env = {'PPROTECT_USER': KURT_EMAIL, 'PPROTECT_PASSPHRASE': KURT_PHRASE} cc = CommandChecker(cmd, chdir=str(tmp_path), env=kurt_env, reraise=True) - # Type "no" instead of "YES" - should abort - res = cc.run('pprotect add-raw-key-custodian', input=['raw@example.com', 'no']) + res = cc.run('pprotect add-key-custodian --key-type raw', input=['raw@example.com', 'no']) assert 'Aborting' in res.stdout - - # Custodian should not exist - add-owner should fail + # Custodian should not exist import ruamel.yaml data = ruamel.yaml.YAML().load(open(str(tmp_path) + '/protected.yaml').read()) assert 'raw@example.com' not in data['key-custodians'] + + +def test_cli_rekey_custodian(tmp_path, _fast_crypto): + """Test rekey-custodian to change key type.""" + cmd = cli._get_cmd() + cc = CommandChecker(cmd, reraise=True) + protected_path = str(tmp_path) + '/protected.yaml' + + # Init with kurt (hard key type) + cc.run('pprotect init --file %s' % protected_path, + input=[KURT_EMAIL, KURT_PHRASE, KURT_PHRASE]) + + kurt_env = {'PPROTECT_USER': KURT_EMAIL, 'PPROTECT_PASSPHRASE': KURT_PHRASE} + cc = CommandChecker(cmd, chdir=str(tmp_path), env=kurt_env, reraise=True) + + # Add domain and secret + cc.run(['pprotect', 'add-domain'], input=[DOMAIN_NAME]) + cc.run(['pprotect', 'add-secret'], input=[DOMAIN_NAME, SECRET_NAME, SECRET_VALUE]) + + # Rekey to raw + res = cc.run('pprotect rekey-custodian --key-type raw', + input=[KURT_EMAIL, KURT_PHRASE, 'YES']) + assert 'GENERATED RAW KEY' in res.stdout + import re + match = re.search(r'(P[0-9a-f]{64}P)', res.stdout) + assert match + raw_passphrase = match.group(1) + + # Old passphrase should no longer work, new raw key should + raw_env = {'PPROTECT_USER': KURT_EMAIL, 'PPROTECT_PASSPHRASE': raw_passphrase} + cc_raw = CommandChecker(cmd, chdir=str(tmp_path), env=raw_env, reraise=True) + res = cc_raw.run(['pprotect', 'decrypt-domain', DOMAIN_NAME]) + assert json.loads(res.stdout)[SECRET_NAME] == SECRET_VALUE From 5f43a29fede66e9dc1a9854ae208820120e8cf91 Mon Sep 17 00:00:00 2001 From: Mahmoud Hashemi Date: Sat, 14 Feb 2026 20:37:44 +0000 Subject: [PATCH 5/5] more coverage --- pocket_protector/tests/test_cli.py | 182 +++++++++++++++++++++++ pocket_protector/tests/test_file_keys.py | 117 +++++++++++++++ 2 files changed, 299 insertions(+) diff --git a/pocket_protector/tests/test_cli.py b/pocket_protector/tests/test_cli.py index 0ea9d7e..0b522e2 100644 --- a/pocket_protector/tests/test_cli.py +++ b/pocket_protector/tests/test_cli.py @@ -316,3 +316,185 @@ def test_cli_rekey_custodian(tmp_path, _fast_crypto): cc_raw = CommandChecker(cmd, chdir=str(tmp_path), env=raw_env, reraise=True) res = cc_raw.run(['pprotect', 'decrypt-domain', DOMAIN_NAME]) assert json.loads(res.stdout)[SECRET_NAME] == SECRET_VALUE + + +def test_cli_invalid_key_type(tmp_path, _fast_crypto): + """Test that --key-type with an invalid value fails.""" + cmd = cli._get_cmd() + cc = CommandChecker(cmd, reraise=True) + protected_path = str(tmp_path) + '/protected.yaml' + res = cc.fail('pprotect init --file %s --key-type bogus' % protected_path, + input=[KURT_EMAIL, KURT_PHRASE, KURT_PHRASE]) + assert 'key-type' in res.stderr.lower() or 'hard' in res.stderr.lower() + + +def test_cli_passphrase_file_not_found(tmp_path, _fast_crypto): + """Test that a nonexistent passphrase file gives a clear error.""" + cmd = cli._get_cmd() + cc = CommandChecker(cmd, reraise=True) + protected_path = str(tmp_path) + '/protected.yaml' + cc.run('pprotect init --file %s' % protected_path, + input=[KURT_EMAIL, KURT_PHRASE, KURT_PHRASE]) + cc = CommandChecker(cmd, chdir=str(tmp_path), reraise=True) + res = cc.fail('pprotect decrypt-domain --non-interactive --passphrase-file /nonexistent/path first-domain', + env={'PPROTECT_USER': KURT_EMAIL}) + assert 'passphrase' in res.stderr.lower() and 'file' in res.stderr.lower() + + +def test_cli_list_domains_empty(tmp_path, _fast_crypto): + """Test list-domains with no domains shows message on stderr.""" + cmd = cli._get_cmd() + cc = CommandChecker(cmd, reraise=True) + protected_path = str(tmp_path) + '/protected.yaml' + cc.run('pprotect init --file %s' % protected_path, + input=[KURT_EMAIL, KURT_PHRASE, KURT_PHRASE]) + cc = CommandChecker(cmd, chdir=str(tmp_path), reraise=True) + res = cc.run('pprotect list-domains') + assert 'No domains' in res.stderr + + +def test_cli_list_domain_secrets_empty(tmp_path, _fast_crypto): + """Test list-domain-secrets with no secrets shows message on stderr.""" + cmd = cli._get_cmd() + cc = CommandChecker(cmd, reraise=True) + protected_path = str(tmp_path) + '/protected.yaml' + cc.run('pprotect init --file %s' % protected_path, + input=[KURT_EMAIL, KURT_PHRASE, KURT_PHRASE]) + kurt_env = {'PPROTECT_USER': KURT_EMAIL, 'PPROTECT_PASSPHRASE': KURT_PHRASE} + cc = CommandChecker(cmd, chdir=str(tmp_path), env=kurt_env, reraise=True) + cc.run(['pprotect', 'add-domain'], input=[DOMAIN_NAME]) + res = cc.run(['pprotect', 'list-domain-secrets', DOMAIN_NAME]) + assert 'No secrets' in res.stderr + + +def test_cli_list_all_secrets_empty(tmp_path, _fast_crypto): + """Test list-all-secrets with no secrets shows message on stderr.""" + cmd = cli._get_cmd() + cc = CommandChecker(cmd, reraise=True) + protected_path = str(tmp_path) + '/protected.yaml' + cc.run('pprotect init --file %s' % protected_path, + input=[KURT_EMAIL, KURT_PHRASE, KURT_PHRASE]) + kurt_env = {'PPROTECT_USER': KURT_EMAIL, 'PPROTECT_PASSPHRASE': KURT_PHRASE} + cc = CommandChecker(cmd, chdir=str(tmp_path), env=kurt_env, reraise=True) + cc.run(['pprotect', 'add-domain'], input=[DOMAIN_NAME]) + res = cc.run('pprotect list-all-secrets') + assert 'No secrets' in res.stderr + + +def test_cli_set_passphrase_fast(tmp_path, _fast_crypto): + """Test set-key-custodian-passphrase with --key-type fast.""" + cmd = cli._get_cmd() + cc = CommandChecker(cmd, reraise=True) + protected_path = str(tmp_path) + '/protected.yaml' + cc.run('pprotect init --file %s' % protected_path, + input=[KURT_EMAIL, KURT_PHRASE, KURT_PHRASE]) + kurt_env = {'PPROTECT_USER': KURT_EMAIL, 'PPROTECT_PASSPHRASE': KURT_PHRASE} + cc = CommandChecker(cmd, chdir=str(tmp_path), env=kurt_env, reraise=True) + cc.run(['pprotect', 'add-domain'], input=[DOMAIN_NAME]) + cc.run(['pprotect', 'add-secret'], input=[DOMAIN_NAME, SECRET_NAME, SECRET_VALUE]) + new_phrase = KURT_PHRASE + '_new' + cc.run('pprotect set-key-custodian-passphrase --key-type fast', + input=[KURT_EMAIL, KURT_PHRASE, new_phrase, new_phrase]) + # Verify decrypt works with new passphrase + new_env = {'PPROTECT_USER': KURT_EMAIL, 'PPROTECT_PASSPHRASE': new_phrase} + cc2 = CommandChecker(cmd, chdir=str(tmp_path), env=new_env, reraise=True) + res = cc2.run(['pprotect', 'decrypt-domain', DOMAIN_NAME]) + assert json.loads(res.stdout)[SECRET_NAME] == SECRET_VALUE + + +def test_cli_rekey_custodian_fast(tmp_path, _fast_crypto): + """Test rekey-custodian --key-type fast.""" + cmd = cli._get_cmd() + cc = CommandChecker(cmd, reraise=True) + protected_path = str(tmp_path) + '/protected.yaml' + cc.run('pprotect init --file %s' % protected_path, + input=[KURT_EMAIL, KURT_PHRASE, KURT_PHRASE]) + kurt_env = {'PPROTECT_USER': KURT_EMAIL, 'PPROTECT_PASSPHRASE': KURT_PHRASE} + cc = CommandChecker(cmd, chdir=str(tmp_path), env=kurt_env, reraise=True) + cc.run(['pprotect', 'add-domain'], input=[DOMAIN_NAME]) + cc.run(['pprotect', 'add-secret'], input=[DOMAIN_NAME, SECRET_NAME, SECRET_VALUE]) + new_phrase = KURT_PHRASE + '_fast' + cc.run('pprotect rekey-custodian --key-type fast', + input=[KURT_EMAIL, KURT_PHRASE, new_phrase, new_phrase]) + new_env = {'PPROTECT_USER': KURT_EMAIL, 'PPROTECT_PASSPHRASE': new_phrase} + cc2 = CommandChecker(cmd, chdir=str(tmp_path), env=new_env, reraise=True) + res = cc2.run(['pprotect', 'decrypt-domain', DOMAIN_NAME]) + assert json.loads(res.stdout)[SECRET_NAME] == SECRET_VALUE + + +def test_cli_rekey_custodian_hard(tmp_path, _fast_crypto): + """Test rekey-custodian --key-type hard.""" + cmd = cli._get_cmd() + cc = CommandChecker(cmd, reraise=True) + protected_path = str(tmp_path) + '/protected.yaml' + cc.run('pprotect init --file %s' % protected_path, + input=[KURT_EMAIL, KURT_PHRASE, KURT_PHRASE]) + kurt_env = {'PPROTECT_USER': KURT_EMAIL, 'PPROTECT_PASSPHRASE': KURT_PHRASE} + cc = CommandChecker(cmd, chdir=str(tmp_path), env=kurt_env, reraise=True) + cc.run(['pprotect', 'add-domain'], input=[DOMAIN_NAME]) + cc.run(['pprotect', 'add-secret'], input=[DOMAIN_NAME, SECRET_NAME, SECRET_VALUE]) + new_phrase = KURT_PHRASE + '_hard' + cc.run('pprotect rekey-custodian --key-type hard', + input=[KURT_EMAIL, KURT_PHRASE, new_phrase, new_phrase]) + new_env = {'PPROTECT_USER': KURT_EMAIL, 'PPROTECT_PASSPHRASE': new_phrase} + cc2 = CommandChecker(cmd, chdir=str(tmp_path), env=new_env, reraise=True) + res = cc2.run(['pprotect', 'decrypt-domain', DOMAIN_NAME]) + assert json.loads(res.stdout)[SECRET_NAME] == SECRET_VALUE + + +def test_cli_rekey_custodian_abort(tmp_path, _fast_crypto): + """Test rekey-custodian --key-type raw abort when user types 'no'.""" + cmd = cli._get_cmd() + cc = CommandChecker(cmd, reraise=True) + protected_path = str(tmp_path) + '/protected.yaml' + cc.run('pprotect init --file %s' % protected_path, + input=[KURT_EMAIL, KURT_PHRASE, KURT_PHRASE]) + kurt_env = {'PPROTECT_USER': KURT_EMAIL, 'PPROTECT_PASSPHRASE': KURT_PHRASE} + cc = CommandChecker(cmd, chdir=str(tmp_path), env=kurt_env, reraise=True) + # Rekey to raw but decline confirmation + res = cc.run('pprotect rekey-custodian --key-type raw', + input=[KURT_EMAIL, KURT_PHRASE, 'no']) + assert 'Aborting' in res.stdout + # Old passphrase should still work + res = cc.run(['pprotect', 'list-domains']) + # No crash means old creds still valid + + +def test_cli_migrate_owner_no_domains(tmp_path, _fast_crypto): + """Test migrate-owner when the user owns no domains.""" + cmd = cli._get_cmd() + cc = CommandChecker(cmd, reraise=True) + protected_path = str(tmp_path) + '/protected.yaml' + cc.run('pprotect init --file %s' % protected_path, + input=[KURT_EMAIL, KURT_PHRASE, KURT_PHRASE]) + kurt_env = {'PPROTECT_USER': KURT_EMAIL, 'PPROTECT_PASSPHRASE': KURT_PHRASE} + cc = CommandChecker(cmd, chdir=str(tmp_path), env=kurt_env, reraise=True) + # Add MH as custodian + cc.run('pprotect add-key-custodian', input=[MH_EMAIL, MH_PHRASE, MH_PHRASE]) + # MH owns no domains, try migrate-owner as MH + mh_env = {'PPROTECT_USER': MH_EMAIL, 'PPROTECT_PASSPHRASE': MH_PHRASE} + cc_mh = CommandChecker(cmd, chdir=str(tmp_path), env=mh_env, reraise=True) + res = cc_mh.run('pprotect migrate-owner', input=[KURT_EMAIL]) + assert 'do not own any domains' in res.stdout.lower() + + +def test_cli_migrate_owner_abort(tmp_path, _fast_crypto): + """Test migrate-owner abort when user types 'n' at confirm.""" + cmd = cli._get_cmd() + cc = CommandChecker(cmd, reraise=True) + protected_path = str(tmp_path) + '/protected.yaml' + cc.run('pprotect init --file %s' % protected_path, + input=[KURT_EMAIL, KURT_PHRASE, KURT_PHRASE]) + kurt_env = {'PPROTECT_USER': KURT_EMAIL, 'PPROTECT_PASSPHRASE': KURT_PHRASE} + cc = CommandChecker(cmd, chdir=str(tmp_path), env=kurt_env, reraise=True) + cc.run('pprotect add-key-custodian', input=[MH_EMAIL, MH_PHRASE, MH_PHRASE]) + cc.run(['pprotect', 'add-domain'], input=[DOMAIN_NAME]) + # Abort migrate-owner + res = cc.run('pprotect migrate-owner', input=[MH_EMAIL, 'n']) + assert 'Aborting' in res.stdout + # Domain should still be owned by kurt, not MH + mh_env = {'PPROTECT_USER': MH_EMAIL, 'PPROTECT_PASSPHRASE': MH_PHRASE} + cc_mh = CommandChecker(cmd, chdir=str(tmp_path), env=mh_env, reraise=True) + # MH should not be owner - list-user-secrets should show no domains + res = cc_mh.run('pprotect list-user-secrets') + assert 'does not own any domains' in res.stderr \ No newline at end of file diff --git a/pocket_protector/tests/test_file_keys.py b/pocket_protector/tests/test_file_keys.py index 738b01d..c6e7926 100644 --- a/pocket_protector/tests/test_file_keys.py +++ b/pocket_protector/tests/test_file_keys.py @@ -330,3 +330,120 @@ def test_raw_key_invalid_passphrase(_fast_crypto): bad_creds = file_keys.Creds('user@example.com', 'regular-password') with pytest.raises(file_keys.PPError): _KeyCustodian.from_raw_creds(bad_creds) + +import base64 + + +def test_decode_unsupported_version(_fast_crypto): + """Test that _decode() raises PPError for non-zero version byte.""" + from pocket_protector.file_keys import _decode + bad_b64 = base64.b64encode(b'\x05' + b'some payload').decode('utf8') + with pytest.raises(file_keys.PPError, match='not supported'): + _decode(bad_b64) + + +def test_custodian_from_data_unsupported_version(_fast_crypto): + """Test _KeyCustodian.from_data with unknown version byte.""" + from pocket_protector.file_keys import _KeyCustodian + raw = b'\x63' + b'\x00' * 40 + encoded = base64.b64encode(raw).decode('utf8') + with pytest.raises(file_keys.PPError, match='unsupported'): + _KeyCustodian.from_data('user@example.com', {'pwdkm': encoded}) + + +def test_decrypt_domain_non_owner(_fast_crypto): + """Test that decrypt_domain raises PPError for non-owner.""" + bob = file_keys.Creds('bob@example.com', 'bob-pass') + alice = file_keys.Creds('alice@example.com', 'alice-pass') + tmp = tempfile.NamedTemporaryFile() + kf = file_keys.KeyFile.create(path=tmp.name) + kf = kf.add_key_custodian(bob) + kf = kf.add_key_custodian(alice) + kf = kf.add_domain('domain', bob.name) # only bob is owner + kf = kf.set_secret('domain', 'key', 'val') + with pytest.raises(file_keys.PPError, match='not an owner'): + kf.decrypt_domain('domain', alice) + + +def test_rm_owner_not_an_owner(_fast_crypto): + """Test rm_owner raises PPError when custodian is not an owner.""" + bob = file_keys.Creds('bob@example.com', 'bob-pass') + tmp = tempfile.NamedTemporaryFile() + kf = file_keys.KeyFile.create(path=tmp.name) + kf = kf.add_key_custodian(bob) + kf = kf.add_domain('domain', bob.name) + with pytest.raises(file_keys.PPError, match='not an owner'): + kf.rm_owner('domain', 'nonexistent@example.com') + + +def test_rm_owner_last_owner(_fast_crypto): + """Test rm_owner raises PPError when removing the last owner.""" + bob = file_keys.Creds('bob@example.com', 'bob-pass') + tmp = tempfile.NamedTemporaryFile() + kf = file_keys.KeyFile.create(path=tmp.name) + kf = kf.add_key_custodian(bob) + kf = kf.add_domain('domain', bob.name) + with pytest.raises(file_keys.PPError, match='irretrievable'): + kf.rm_owner('domain', bob.name) + + +def test_key_domain_missing_secret(_fast_crypto): + """Test _KeyDomain raises PPKeyError on missing key.""" + bob = file_keys.Creds('bob@example.com', 'bob-pass') + tmp = tempfile.NamedTemporaryFile() + kf = file_keys.KeyFile.create(path=tmp.name) + kf = kf.add_key_custodian(bob) + kf = kf.add_domain('domain', bob.name) + kf = kf.set_secret('domain', 'exists', 'val') + decrypted = kf.decrypt_domain('domain', bob) + with pytest.raises(file_keys.PPKeyError, match='no secret'): + decrypted['nonexistent'] + + +def test_add_raw_key_custodian_duplicate(_fast_crypto): + """Test add_raw_key_custodian raises PPError for duplicate email.""" + from pocket_protector.file_keys import generate_raw_passphrase + passphrase = generate_raw_passphrase() + creds = file_keys.Creds('raw@example.com', passphrase) + tmp = tempfile.NamedTemporaryFile() + kf = file_keys.KeyFile.create(path=tmp.name) + kf = kf.add_raw_key_custodian(creds) + with pytest.raises(file_keys.PPError, match='already exists'): + kf.add_raw_key_custodian(creds) + + +def test_rekey_custodian_name_mismatch(_fast_crypto): + """Test rekey_custodian raises PPError when names differ.""" + bob = file_keys.Creds('bob@example.com', 'bob-pass') + alice = file_keys.Creds('alice@example.com', 'alice-pass') + tmp = tempfile.NamedTemporaryFile() + kf = file_keys.KeyFile.create(path=tmp.name) + kf = kf.add_key_custodian(bob) + with pytest.raises(file_keys.PPError, match='same custodian name'): + kf.rekey_custodian(bob, alice) + + +def test_rekey_custodian_kdf(_fast_crypto): + """Test rekey_custodian with raw_key=False (KDF path).""" + bob = file_keys.Creds('bob@example.com', 'bob-pass') + tmp = tempfile.NamedTemporaryFile() + kf = file_keys.KeyFile.create(path=tmp.name) + kf = kf.add_key_custodian(bob) + kf = kf.add_domain('domain', bob.name) + kf = kf.set_secret('domain', 'key', 'val') + new_bob = file_keys.Creds('bob@example.com', 'new-bob-pass') + kf = kf.rekey_custodian(bob, new_bob, raw_key=False) + assert kf.decrypt_domain('domain', new_bob)['key'] == 'val' + assert kf.check_creds(new_bob) + assert not kf.check_creds(bob) + + +def test_truncate_audit_log_no_op(_fast_crypto): + """Test truncate_audit_log returns same object when log is short.""" + creds = file_keys.Creds('user@example.com', 'pass') + tmp = tempfile.NamedTemporaryFile() + kf = file_keys.KeyFile.create(path=tmp.name) + kf = kf.add_key_custodian(creds) + # kf has 2 log entries (create + add_key_custodian) + result = kf.truncate_audit_log(100) + assert result is kf \ No newline at end of file