Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,5 @@ jobs:
run: uv sync --all-extras
- name: Run pytest
run: uv run pytest
- name: Run examples
run: uv run pytest examples/
7 changes: 7 additions & 0 deletions didcomm_messaging/crypto/backend/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,8 @@
"""Cryptography and Secrets Management backends."""

from didcomm_messaging.crypto.backend.basic import (
FileBasedSecretsManager,
InMemorySecretsManager,
)

__all__ = ["FileBasedSecretsManager", "InMemorySecretsManager"]
11 changes: 11 additions & 0 deletions didcomm_messaging/crypto/backend/authlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,17 @@ def from_verification_method(cls, vm: VerificationMethod) -> "AuthlibKey":
key = cls.multikey_to_key(multikey)
return cls(key, kid)

if vm.type == "JsonWebKey2020":
jwk = vm.public_key_jwk
if not jwk:
raise ValueError("JWK verification method missing key")

try:
key = JsonWebKey.import_key(jwk)
except Exception as err:
raise ValueError("Invalid JWK") from err
return cls(key, kid)

codec = cls.type_to_codec.get(vm.type)
if not codec:
raise ValueError("Unsupported verification method type: {vm_type}")
Expand Down
81 changes: 80 additions & 1 deletion didcomm_messaging/crypto/backend/basic.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
"""Basic Crypto Implementations."""

from typing import Optional
import atexit
import json
import shutil
from pathlib import Path
from typing import Callable, Dict, Optional

from didcomm_messaging.crypto.base import S, SecretsManager


Expand All @@ -18,3 +23,77 @@ async def get_secret_by_kid(self, kid: str) -> Optional[S]:
async def add_secret(self, secret: S) -> None:
"""Add a secret to the secrets manager."""
self.secrets[secret.kid] = secret


class FileBasedSecretsManager(SecretsManager[S]):
"""File-based Secrets Manager with in-memory caching and auto-save.

Secrets are stored in memory for fast access and persisted to a JSONL file.
The file is saved automatically on program exit via atexit, and can also
be flushed explicitly using the flush() method.

Requires serializer and deserializer callbacks to convert between SecretKey
objects and their JSON-serializable representation.
"""

def __init__(
self,
path: str,
serializer: Callable[[S], Dict],
deserializer: Callable[[str, Dict], S],
secrets: Optional[dict] = None,
):
"""Initialize the FileBasedSecretsManager.

Args:
path: Full path to the JSONL file for storing secrets.
serializer: Callback to serialize a SecretKey to a dict.
deserializer: Callback to deserialize a dict to a SecretKey.
Takes (kid, serialized_dict) as arguments.
secrets: Optional initial secrets to load (file takes precedence).
"""
self._path = Path(path)
self._serializer = serializer
self._deserializer = deserializer
self._secrets: Dict[str, S] = secrets or {}

if self._path.exists():
with open(self._path) as f:
for line in f:
line = line.strip()
if not line:
continue
data = json.loads(line)
kid = data.get("kid")
if kid:
self._secrets[kid] = self._deserializer(kid, data)

atexit.register(self._sync)

@property
def path(self) -> str:
"""Return the path to the secrets file."""
return str(self._path)

async def get_secret_by_kid(self, kid: str) -> Optional[S]:
"""Get a secret by its kid."""
return self._secrets.get(kid)

async def add_secret(self, secret: S) -> None:
"""Add a secret to the secrets manager."""
self._secrets[secret.kid] = secret

async def flush(self) -> None:
"""Explicitly save secrets to the file."""
self._sync()

def _sync(self) -> None:
"""Write secrets to file (called on atexit and flush)."""
tmp_path = self._path.with_suffix(".tmp")
self._path.parent.mkdir(parents=True, exist_ok=True)
with open(tmp_path, "w") as f:
for kid, secret in self._secrets.items():
data = self._serializer(secret)
data["kid"] = kid
f.write(json.dumps(data) + "\n")
shutil.move(tmp_path, self._path)
8 changes: 5 additions & 3 deletions didcomm_messaging/crypto/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ async def ecdh_es_encrypt(self, to_keys: Sequence[P], message: bytes) -> bytes:
"""Encode a message into DIDComm v2 anonymous encryption."""

@abstractmethod
async def ecdh_es_decrypt(self, wrapper: Union[str, bytes], recip_key: S) -> bytes:
async def ecdh_es_decrypt(
self, enc_message: Union[str, bytes], recip_key: S
) -> bytes:
"""Decode a message from DIDComm v2 anonymous encryption."""

@abstractmethod
Expand All @@ -106,7 +108,7 @@ async def ecdh_1pu_encrypt(
@abstractmethod
async def ecdh_1pu_decrypt(
self,
wrapper: Union[str, bytes],
enc_message: Union[str, bytes],
recip_key: S,
sender_key: P,
) -> bytes:
Expand All @@ -121,7 +123,7 @@ def verification_method_to_public_key(cls, vm: VerificationMethod) -> P:
class SecretsManager(ABC, Generic[S]):
"""Secrets Resolver interface.

Thie secrets resolver may be used to supplement the CryptoService backend to provide
The secrets resolver may be used to supplement the CryptoService backend to provide
greater flexibility.
"""

Expand Down
3 changes: 1 addition & 2 deletions didcomm_messaging/legacy/crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,7 @@ def sign_message_field(field_value: Dict, signer: str, secret: bytes) -> Dict:
)

return {
"@type": "did:sov:BzCbsNYhMrjHiqZDTUASHg;spec"
"/signature/1.0/ed25519Sha512_single",
"@type": "did:sov:BzCbsNYhMrjHiqZDTUASHg;spec/signature/1.0/ed25519Sha512_single",
"signer": signer,
"sig_data": sig_data,
"signature": signature,
Expand Down
121 changes: 121 additions & 0 deletions examples/authlib_file_secrets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""Example of using authlib crypto with file-based secrets storage."""

import asyncio
import json
import tempfile
from pathlib import Path

from authlib.jose import OKPKey

from didcomm_messaging.crypto.backend.authlib import (
AuthlibCryptoService,
AuthlibSecretKey,
)
from didcomm_messaging.crypto.backend.basic import FileBasedSecretsManager
from didcomm_messaging.multiformats.multibase import Base64UrlEncoder
from didcomm_messaging.packaging import PackagingService
from didcomm_messaging.resolver import PrefixResolver
from didcomm_messaging.resolver.jwk import JWKResolver

b64 = Base64UrlEncoder()


def create_jwk_did(jwk: dict) -> str:
"""Create a did:jwk from a JWK dict."""
encoded = b64.encode(json.dumps(jwk).encode())
return f"did:jwk:{encoded}"


async def main():
"""Run the example."""
# Create a temporary file for secrets storage
secrets_file = tempfile.NamedTemporaryFile(suffix=".jsonl", delete=False)
secrets_path = secrets_file.name
secrets_file.close()

# Serializer: Convert AuthlibSecretKey to JWK dict
def serialize_secret(secret: AuthlibSecretKey) -> dict:
return secret.key.as_dict(is_private=True)

# Deserializer: Convert JWK dict back to AuthlibSecretKey
def deserialize_secret(kid: str, data: dict) -> AuthlibSecretKey:
key = OKPKey.import_key(data)
return AuthlibSecretKey(key, kid)

# Create the file-based secrets manager
secrets = FileBasedSecretsManager(secrets_path, serialize_secret, deserialize_secret)

# Generate keys for sender and recipient
# Using X25519 for both since it supports key agreement (encryption)
sender_sk = OKPKey.generate_key("X25519", is_private=True)
recipient_sk = OKPKey.generate_key("X25519", is_private=True)

# Get JWKs and create DIDs
# For 1PU (authenticated encryption), we need key agreement keys
sender_jwk = {**sender_sk.as_dict(), "use": "enc"}
recipient_jwk = {**recipient_sk.as_dict(), "use": "enc"}

sender_did = create_jwk_did(sender_jwk)
recipient_did = create_jwk_did(recipient_jwk)

# Add keys to secrets manager with proper kids
sender_secret = AuthlibSecretKey(sender_sk, f"{sender_did}#0")
recipient_secret = AuthlibSecretKey(recipient_sk, f"{recipient_did}#0")

await secrets.add_secret(sender_secret)
await secrets.add_secret(recipient_secret)

# Set up crypto and resolver
crypto = AuthlibCryptoService()
resolver = PrefixResolver({"did:jwk": JWKResolver()})
packer = PackagingService()

message = b"Hello, secure world!"

# Pack the message using authenticated encryption (ECDH-1PU)
# Requires both sender and recipient to have key agreement keys
packed = await packer.pack(
crypto=crypto,
resolver=resolver,
secrets=secrets,
message=message,
to=[recipient_did],
frm=sender_did,
)
print("Packed message:")
print(json.dumps(json.loads(packed), indent=2))

# Flush secrets to file
await secrets.flush()

# Show the contents of the secrets file
print("\nSecrets file contents:")
with open(secrets_path) as f:
for line in f:
print(line.strip())

# Create a new secrets manager that loads from the file
# This exercises the deserializer
print("\n--- Creating new secrets manager from file ---")
secrets2 = FileBasedSecretsManager(secrets_path, serialize_secret, deserialize_secret)

# Unpack the message using the newly loaded secrets
plaintext, metadata = await packer.unpack(
crypto=crypto,
resolver=resolver,
secrets=secrets2,
enc_message=packed,
)
print("\nUnpacked message:")
print(plaintext)

# Verify the message matches
assert plaintext == message
print("\nSuccess! Round-trip completed with deserialized secrets.")

# Clean up
Path(secrets_path).unlink()


if __name__ == "__main__":
asyncio.run(main())
42 changes: 42 additions & 0 deletions examples/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import subprocess
from pathlib import Path

import pytest


def pytest_collect_file(parent, file_path: Path):
"""Collect Python files in examples/ as test items."""
if file_path.suffix == ".py" and file_path.name not in ("conftest.py",):
return ExampleFile.from_parent(parent, path=file_path)


class ExampleFile(pytest.File):
"""pytest collector for example scripts."""

def collect(self):
yield ExampleItem.from_parent(self, name=self.path.stem)


class ExampleItem(pytest.Item):
"""pytest item that runs an example script."""

def runtest(self):
result = subprocess.run(
["python", str(self.path)],
capture_output=True,
text=True,
cwd=self.path.parent,
)
if result.returncode != 0:
raise ExampleFailedError(
f"Example {self.name} failed with code {result.returncode}\n"
f"stdout: {result.stdout}\n"
f"stderr: {result.stderr}"
)

def reportinfo(self):
return self.path, 0, f"Example: {self.path.name}"


class ExampleFailedError(Exception):
"""Raised when an example script fails to run."""
File renamed without changes.
Loading
Loading