Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
<!-- Application Layer -->
<PackageVersion Include="MediatR" Version="12.4.1" />
<PackageVersion Include="FluentValidation" Version="11.10.0" />
<PackageVersion Include="FluentValidation.DependencyInjectionExtensions" Version="11.10.0" />

<!-- Infrastructure Layer - EF Core -->
<PackageVersion Include="Microsoft.EntityFrameworkCore" Version="8.0.10" />
Expand All @@ -22,8 +23,9 @@
<PackageVersion Include="Amazon.Lambda.SQSEvents" Version="2.2.0" />
<PackageVersion Include="Amazon.Lambda.Serialization.SystemTextJson" Version="2.4.3" />

<!-- Dependency Injection -->
<!-- Dependency Injection & Logging -->
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="8.0.1" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.2" />

<!-- Testing Frameworks -->
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.6.0" />
Expand Down
4 changes: 0 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,6 @@ See [`.cursor/rules/ai-agent.mdc`](.cursor/rules/ai-agent.mdc) for complete codi

## Next Steps

### Phase 2: Domain Layer
- Implement `Lead` entity with validation rules
- Create `ILeadRepository` interface

### Phase 3: Application Layer
- Create DTOs and MediatR commands
- Implement FluentValidation validators
Expand Down
62 changes: 62 additions & 0 deletions src/LeadProcessor.Application/Commands/ProcessLeadCommand.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
using MediatR;

namespace LeadProcessor.Application.Commands;

/// <summary>
/// Command to process a lead received from the SQS queue.
/// Implements MediatR's IRequest interface for CQRS pattern.
/// </summary>
public record ProcessLeadCommand : IRequest<Unit>
{
/// <summary>
/// Gets the tenant identifier for multi-tenancy support.
/// </summary>
public required string TenantId { get; init; }

/// <summary>
/// Gets the correlation identifier for idempotency and message tracking.
/// </summary>
public required string CorrelationId { get; init; }

/// <summary>
/// Gets the email address of the lead.
/// </summary>
public required string Email { get; init; }

/// <summary>
/// Gets the first name of the lead.
/// </summary>
public string? FirstName { get; init; }

/// <summary>
/// Gets the last name of the lead.
/// </summary>
public string? LastName { get; init; }

/// <summary>
/// Gets the phone number of the lead.
/// </summary>
public string? Phone { get; init; }

/// <summary>
/// Gets the company name of the lead.
/// </summary>
public string? Company { get; init; }

/// <summary>
/// Gets the source from which the lead originated.
/// </summary>
public required string Source { get; init; }

/// <summary>
/// Gets the metadata as a JSON string containing additional information.
/// </summary>
public string? Metadata { get; init; }

/// <summary>
/// Gets the ISO 8601 formatted timestamp when the message was sent.
/// Used for message tracking and debugging.
/// </summary>
public string? MessageTimestamp { get; init; }
}

61 changes: 61 additions & 0 deletions src/LeadProcessor.Application/DTOs/LeadCreatedEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
namespace LeadProcessor.Application.DTOs;

/// <summary>
/// Represents a lead creation event received from the SQS message queue.
/// This DTO maps to the message structure sent by the PHP gateway.
/// </summary>
public record LeadCreatedEvent
{
/// <summary>
/// Gets the tenant identifier for multi-tenancy support.
/// </summary>
public required string TenantId { get; init; }

/// <summary>
/// Gets the correlation identifier for idempotency and message tracking.
/// Must be unique per message to prevent duplicate processing.
/// </summary>
public required string CorrelationId { get; init; }

/// <summary>
/// Gets the email address of the lead.
/// </summary>
public required string Email { get; init; }

/// <summary>
/// Gets the first name of the lead.
/// </summary>
public string? FirstName { get; init; }

/// <summary>
/// Gets the last name of the lead.
/// </summary>
public string? LastName { get; init; }

/// <summary>
/// Gets the phone number of the lead.
/// </summary>
public string? Phone { get; init; }

/// <summary>
/// Gets the company name of the lead.
/// </summary>
public string? Company { get; init; }

/// <summary>
/// Gets the source from which the lead originated (e.g., website, mobile app, referral).
/// </summary>
public required string Source { get; init; }

/// <summary>
/// Gets the metadata as a JSON string containing additional information about the lead.
/// </summary>
public string? Metadata { get; init; }

/// <summary>
/// Gets the ISO 8601 formatted timestamp when the message was sent.
/// This will be parsed to DateTimeOffset in the handler.
/// </summary>
public string? MessageTimestamp { get; init; }
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using FluentValidation;
using Microsoft.Extensions.DependencyInjection;
using System.Reflection;

namespace LeadProcessor.Application.DependencyInjection;

/// <summary>
/// Extension methods for configuring application services in the dependency injection container.
/// </summary>
public static class ServiceCollectionExtensions
{
/// <summary>
/// Adds Application layer services to the dependency injection container.
/// This includes MediatR, FluentValidation, and all command handlers and validators.
/// </summary>
/// <param name="services">The service collection to add services to.</param>
/// <returns>The service collection for method chaining.</returns>
public static IServiceCollection AddApplicationServices(this IServiceCollection services)
{
var assembly = Assembly.GetExecutingAssembly();

// Register MediatR with all handlers from this assembly
services.AddMediatR(config =>
{
config.RegisterServicesFromAssembly(assembly);
});

// Register FluentValidation validators from this assembly
services.AddValidatorsFromAssembly(assembly);

return services;
}
}

142 changes: 142 additions & 0 deletions src/LeadProcessor.Application/Handlers/ProcessLeadCommandHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
using FluentValidation;
using LeadProcessor.Application.Commands;
using LeadProcessor.Domain.Entities;
using LeadProcessor.Domain.Exceptions;
using LeadProcessor.Domain.Repositories;
using LeadProcessor.Domain.Services;
using MediatR;
using Microsoft.Extensions.Logging;
using System.Globalization;

namespace LeadProcessor.Application.Handlers;

/// <summary>
/// Handler for processing lead commands.
/// Implements idempotency, validation, and persistence logic for incoming leads.
/// </summary>
public class ProcessLeadCommandHandler(
ILeadRepository repository,
IDateTimeProvider dateTimeProvider,
IValidator<ProcessLeadCommand> validator,
ILogger<ProcessLeadCommandHandler> logger) : IRequestHandler<ProcessLeadCommand, Unit>
{
/// <summary>
/// Handles the ProcessLeadCommand by validating, checking for duplicates, and persisting the lead.
/// </summary>
/// <param name="request">The command containing lead data.</param>
/// <param name="cancellationToken">Cancellation token to cancel the operation.</param>
/// <returns>Unit value indicating successful completion.</returns>
/// <exception cref="ValidationException">Thrown when the command fails validation.</exception>
/// <exception cref="DuplicateLeadException">Thrown when a lead with the same correlation ID already exists.</exception>
public async Task<Unit> Handle(ProcessLeadCommand request, CancellationToken cancellationToken)
{
logger.LogInformation(
"Processing lead command for correlation ID {CorrelationId}, tenant {TenantId}, email {Email}",
request.CorrelationId,
request.TenantId,
request.Email);

try
{
// 1. Validate the command
var validationResult = await validator.ValidateAsync(request, cancellationToken);
if (!validationResult.IsValid)
{
logger.LogWarning(
"Validation failed for correlation ID {CorrelationId}: {Errors}",
request.CorrelationId,
string.Join(", ", validationResult.Errors.Select(e => e.ErrorMessage)));

throw new ValidationException(validationResult.Errors);
}

// 2. Check for idempotency - prevent duplicate processing
var exists = await repository.ExistsByCorrelationIdAsync(request.CorrelationId, cancellationToken);
if (exists)
{
logger.LogInformation(
"Lead with correlation ID {CorrelationId} already exists. Skipping duplicate processing.",
request.CorrelationId);

throw new DuplicateLeadException(request.CorrelationId);
}

// 3. Get current timestamp for entity creation
var now = dateTimeProvider.UtcNow;

// 4. Parse message timestamp for future audit trail enhancement
DateTimeOffset? messageTimestamp = null;
if (!string.IsNullOrWhiteSpace(request.MessageTimestamp))
{
if (DateTimeOffset.TryParse(
request.MessageTimestamp,
CultureInfo.InvariantCulture,
DateTimeStyles.AssumeUniversal,
out var parsedTimestamp))
{
messageTimestamp = parsedTimestamp;
logger.LogDebug(
"Parsed message timestamp {MessageTimestamp} for correlation ID {CorrelationId}",
messageTimestamp,
request.CorrelationId);
}
else
{
logger.LogWarning(
"Failed to parse message timestamp '{MessageTimestamp}' for correlation ID {CorrelationId}",
request.MessageTimestamp,
request.CorrelationId);
}
}

// 5. Map command to domain entity
var lead = new Lead
{
TenantId = request.TenantId,
CorrelationId = request.CorrelationId,
Email = request.Email,
FirstName = request.FirstName,
LastName = request.LastName,
Phone = request.Phone,
Company = request.Company,
Source = request.Source,
Metadata = request.Metadata,
CreatedAt = now,
UpdatedAt = now
};

// 6. Persist to repository
var savedLead = await repository.SaveLeadAsync(lead, cancellationToken);

logger.LogInformation(
"Successfully processed lead {LeadId} for correlation ID {CorrelationId}, tenant {TenantId}",
savedLead.Id,
savedLead.CorrelationId,
savedLead.TenantId);

return Unit.Value;
}
catch (ValidationException)
{
// Re-throw validation exceptions without logging as error
// (already logged as warning above)
throw;
}
catch (DuplicateLeadException)
{
// Re-throw duplicate exceptions without logging as error
// (already logged as information above - this is expected behavior)
throw;
}
catch (Exception ex)
{
logger.LogError(
ex,
"Unexpected error processing lead for correlation ID {CorrelationId}, tenant {TenantId}",
request.CorrelationId,
request.TenantId);
throw;
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@

<ItemGroup>
<PackageReference Include="FluentValidation" />
<PackageReference Include="FluentValidation.DependencyInjectionExtensions" />
<PackageReference Include="MediatR" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
</ItemGroup>

<ItemGroup>
Expand Down
Loading