Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@ nosetests.xml
*.sw[op]

.cache/
.venv/
2 changes: 2 additions & 0 deletions pocket_protector/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from ._version import __version__
from .file_keys import KeyFile, Creds, PPError, KDF_SENSITIVE, KDF_INTERACTIVE
14 changes: 10 additions & 4 deletions pocket_protector/__main__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import os
import sys

from .cli import main
from .cli import main as _main

if __name__ == '__main__': # pragma: no cover

def main():
try:
sys.exit(main() or 0)
sys.exit(_main() or 0)
except Exception:
if os.getenv('PPROTECT_ENABLE_DEBUG'):
import pdb;pdb.post_mortem()
import pdb
pdb.post_mortem()
raise


if __name__ == '__main__':
main()
7 changes: 1 addition & 6 deletions pocket_protector/_version.py
Original file line number Diff line number Diff line change
@@ -1,6 +1 @@

# The version of PocketProtector, used in the version subcommand, as
# well as in setup.py. For full release directions, see the bottom of
# setup.py.

__version__ = '20.0.2dev'
__version__ = '24.0.0dev'
116 changes: 99 additions & 17 deletions pocket_protector/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,6 @@
# TODO: custodian-signed values. allow custodians to sign values
# added/set by others, then produced reports on which secrets have been
# updated/changed but not signed yet. enables a review/audit mechanism.

try:
unicode
except NameError:
# py3
unicode = str


def _get_text(inp):
if not isinstance(inp, unicode):
return inp.decode('utf8')
return inp


def _create_protected(path):
if os.path.exists(path):
raise UsageError('Protected file already exists: %s' % path, 2)
Expand Down Expand Up @@ -117,7 +103,7 @@ def _get_creds(kf,
passphrase = prompt.secret('Passphrase: ', confirm=False)
passphrase_source = 'stdin'

creds = Creds(_get_text(user or ''), _get_text(passphrase or ''),
creds = Creds(user or '', passphrase or '',
name_source=user_source, passphrase_source=passphrase_source)
_check_creds(kf, creds)

Expand Down Expand Up @@ -162,6 +148,8 @@ def _get_cmd(prepare=False):
doc="the acting user's email credential")
cmd.add('--passphrase-file',
doc='path to a file containing only the passphrase, likely provided by a deployment system')
cmd.add('--key-type',
doc='custodian key type: hard (default, slow KDF), fast (quick KDF), or raw (no KDF, generated key)')

# add middlewares, outermost first ("first added, first called")
cmd.add(mw_verify_creds)
Expand All @@ -172,6 +160,7 @@ def _get_cmd(prepare=False):
# add subcommands
cmd.add(add_key_custodian, name='init', doc='create a new protected')
cmd.add(add_key_custodian)
cmd.add(rekey_custodian)

cmd.add(add_domain)
cmd.add(rm_domain)
Expand All @@ -186,6 +175,9 @@ def _get_cmd(prepare=False):
cmd.add(set_key_custodian_passphrase)
cmd.add(rotate_domain_keys)

cmd.add(migrate_owner)
cmd.add(list_user_secrets)

cmd.add(decrypt_domain, posargs={'count': 1, 'provides': 'domain_name'})

cmd.add(list_domains)
Expand Down Expand Up @@ -213,10 +205,47 @@ def main(argv=None): # pragma: no cover (see note in tests.test_cli.test_main)
The following subcommand handlers all update/write to a protected file (wkf).
"""

def add_key_custodian(wkf):
_KEY_TYPES = ('hard', 'fast', 'raw')


def _validate_key_type(key_type):
if key_type and key_type not in _KEY_TYPES:
raise UsageError('--key-type must be one of: %s' % ', '.join(_KEY_TYPES))
return key_type


def _show_raw_key_and_confirm(passphrase):
'''Display a generated raw key and require YES confirmation. Returns True if confirmed.'''
echo('')
echo('=' * 72)
echo(' GENERATED RAW KEY (copy this now, it will not be shown again):')
echo('')
echo(' ' + passphrase)
echo('')
echo(' Store this key securely. It is your passphrase.')
echo(' LOSS OF THIS KEY MEANS LOSS OF CUSTODIAN ACCESS.')
echo('=' * 72)
echo('')
confirm = prompt('Have you saved the key? Type YES to confirm: ')
return confirm.strip() == 'YES'


def add_key_custodian(wkf, key_type=None):
'add a new key custodian to the protected'
from .file_keys import KDF_INTERACTIVE, generate_raw_passphrase
key_type = _validate_key_type(key_type)
echo('Adding new key custodian.')
if key_type == 'raw':
user_id = prompt('User email: ')
passphrase = generate_raw_passphrase()
if not _show_raw_key_and_confirm(passphrase):
echo('Aborting. Key was not confirmed.')
return None
creds = Creds(user_id, passphrase)
return wkf.add_raw_key_custodian(creds)
creds = _get_new_creds()
if key_type == 'fast':
return wkf.add_key_custodian(creds, opslimit=KDF_INTERACTIVE[0], memlimit=KDF_INTERACTIVE[1])
return wkf.add_key_custodian(creds)


Expand Down Expand Up @@ -277,22 +306,63 @@ def rm_secret(wkf):
return wkf.rm_secret(domain_name, secret_name)


def set_key_custodian_passphrase(wkf):
def set_key_custodian_passphrase(wkf, key_type=None):
'update a key custodian passphrase'
from .file_keys import KDF_INTERACTIVE
key_type = _validate_key_type(key_type)
user_id = prompt('User email: ')
passphrase = prompt.secret('Current passphrase: ')
creds = Creds(user_id, passphrase)
_check_creds(wkf, creds)
new_passphrase = prompt.secret('New passphrase: ', confirm=True)
if key_type == 'fast':
return wkf.set_key_custodian_passphrase(creds, new_passphrase, opslimit=KDF_INTERACTIVE[0], memlimit=KDF_INTERACTIVE[1])
return wkf.set_key_custodian_passphrase(creds, new_passphrase)


def rekey_custodian(wkf, key_type=None):
'change a custodian key type (hard/fast/raw) and passphrase, re-encrypting all owned domains'
from .file_keys import KDF_INTERACTIVE, generate_raw_passphrase
key_type = _validate_key_type(key_type) or 'hard'
user_id = prompt('User email: ')
passphrase = prompt.secret('Current passphrase: ')
creds = Creds(user_id, passphrase)
_check_creds(wkf, creds)
if key_type == 'raw':
new_passphrase = generate_raw_passphrase()
if not _show_raw_key_and_confirm(new_passphrase):
echo('Aborting. Key was not confirmed.')
return None
new_creds = Creds(creds.name, new_passphrase)
return wkf.rekey_custodian(creds, new_creds, raw_key=True)
new_passphrase = prompt.secret('New passphrase: ', confirm=True)
new_creds = Creds(creds.name, new_passphrase)
if key_type == 'fast':
return wkf.rekey_custodian(creds, new_creds, opslimit=KDF_INTERACTIVE[0], memlimit=KDF_INTERACTIVE[1])
return wkf.rekey_custodian(creds, new_creds)


def rotate_domain_keys(wkf, creds):
'rotate the internal encryption keys for a given domain'
domain_name = prompt('Domain name: ')
return wkf.rotate_domain_key(domain_name, creds)


def migrate_owner(wkf, creds):
'grant a custodian ownership of all domains you own'
new_owner = prompt('New owner email: ')
owned = wkf.get_custodian_domains(creds.name)
if not owned:
echo('You do not own any domains.')
return None
echo('Will add %s as owner to: %s' % (new_owner, ', '.join(sorted(owned))))
confirm = prompt('Proceed? [y/N] ')
if not confirm.lower().startswith('y'):
echo('Aborting.')
return None
return wkf.migrate_owner(new_owner, creds)


"""
Read-only operations follow
"""
Expand Down Expand Up @@ -350,6 +420,18 @@ def list_audit_log(kf):
return


def list_user_secrets(kf, creds):
'display domains and secrets accessible to the authenticated user'
owned = kf.get_custodian_domains(creds.name)
if not owned:
echo.err('User %s does not own any domains.' % creds.name)
return
for domain_name in sorted(owned):
secrets = kf.get_domain_secret_names(domain_name)
echo('%s: %s' % (domain_name, ', '.join(secrets) if secrets else '(no secrets)'))
return


"""
End subcommand handlers

Expand Down
Loading