diff --git a/phpunit.xml b/phpunit.xml index 3c5e38b..5030974 100644 --- a/phpunit.xml +++ b/phpunit.xml @@ -31,6 +31,7 @@ src/Exceptions/Handler.php src/Libraries/Kafka.php + src/Libraries/Kafka/Kafka.php src/Libraries/KafkaCallable.php src/Libraries/Storage.php src/Commands diff --git a/src/Commands/KafkaSchemaMakeCommand.php b/src/Commands/KafkaSchemaMakeCommand.php new file mode 100644 index 0000000..a3ad5ef --- /dev/null +++ b/src/Commands/KafkaSchemaMakeCommand.php @@ -0,0 +1,308 @@ + + * @license https://mit-license.org/ MIT License + * @version GIT: 0.0.1 + * @link https://github.com/spotlibs + */ + +declare(strict_types=1); + +namespace Spotlibs\PhpLib\Commands; + +use Illuminate\Console\GeneratorCommand; +use Illuminate\Contracts\Filesystem\FileNotFoundException; +use JsonException; +use Symfony\Component\Console\Input\InputOption; + +/** + * KafkaSchemaMakeCommand + * + * Custom command for Kafka schema + * + * @category Console + * @package Commands + * @author Mufthi Ryanda + * @license https://mit-license.org/ MIT License + * @link https://github.com/spotlibs + */ +class KafkaSchemaMakeCommand extends GeneratorCommand +{ + /** + * The console command name. + * + * @var string + */ + protected $name = 'make:kafka-schema'; + + /** + * The console command description. + * + * @var string + */ + protected $description = 'Create a new Kafka schema model class'; + + /** + * The type of class being generated. + * + * @var string + */ + protected $type = 'Kafka Schema Model'; + + /** + * Schema input from user + * + * @var array + */ + protected array $schemaData = []; + + /** + * Schema type + * + * @var string + */ + protected string $schemaType = 'avro'; + + /** + * Execute the console command. + * + * @return int + * @throws FileNotFoundException + */ + public function handle(): int + { + $this->schemaType = $this->option('type') ?? 'avro'; + + if (!in_array($this->schemaType, ['avro', 'json'])) { + $this->error('Invalid schema type. Must be either "avro" or "json".'); + return 1; + } + + $schemaInput = $this->ask('Paste your ' . strtoupper($this->schemaType) . ' schema (single line)'); + + try { + $this->schemaData = json_decode($schemaInput, true, 512, JSON_THROW_ON_ERROR); + } catch (JsonException $e) { + $this->error('Invalid JSON schema: ' . $e->getMessage()); + return 1; + } + + parent::handle(); + $this->createCollection(); + return 0; + } + + /** + * Get the stub file for the generator. + * + * @return string + */ + protected function getStub(): string + { + if ($this->schemaType === 'json') { + return __DIR__ . '/stubs/kafka.schema.json.stub'; + } + + return __DIR__ . '/stubs/kafka.schema.avro.stub'; + } + + /** + * Build the class with the given name. + * + * @param string $name name of the class + * + * @return string + * @throws FileNotFoundException + */ + protected function buildClass($name): string + { + $stub = parent::buildClass($name); + + return $this->replaceSchema($stub); + } + + /** + * Replace schema placeholders in stub + * + * @param string $stub stub content + * + * @return string + */ + protected function replaceSchema(string $stub): string + { + $className = class_basename($this->argument('name')); + + if ($this->schemaType === 'avro') { + $fields = $this->schemaData['fields'] ?? []; + $fillable = $this->generateFillableFromAvro($fields); + $casts = $this->generateCastsFromAvro($fields); + $schemaName = $this->schemaData['name'] ?? 'value_Schema' . str_replace('CDC', '', $className); + $namespace = $this->schemaData['namespace'] ?? strtolower(preg_replace('/(?schemaData['properties'] ?? []; + $fillable = $this->generateFillableFromJson($properties); + $casts = $this->generateCastsFromJson($properties); + $schemaName = 'Schema' . str_replace('CDC', '', $className); + $namespace = strtolower(preg_replace('/(?formatSchemaBody(), $stub); + $stub = str_replace('DummySchemaName', $schemaName, $stub); + $stub = str_replace('DummySchemaNamespace', $namespace, $stub); + + return $stub; + } + + /** + * Generate fillable array from AVRO fields + * + * @param array $fields AVRO fields + * + * @return string + */ + protected function generateFillableFromAvro(array $fields): string + { + $fillable = array_map(fn($field) => " '{$field['name']}'", $fields); + return implode(",\n", $fillable); + } + + /** + * Generate casts array from AVRO fields + * + * @param array $fields AVRO fields + * + * @return string + */ + protected function generateCastsFromAvro(array $fields): string + { + $casts = []; + foreach ($fields as $field) { + $type = is_array($field['type']) ? $field['type'][1] ?? 'string' : $field['type']; + $phpType = $this->mapAvroTypeToPhp($type); + $casts[] = " '{$field['name']}' => '{$phpType}'"; + } + return implode(",\n", $casts); + } + + /** + * Generate fillable array from JSON properties + * + * @param array $properties JSON properties + * + * @return string + */ + protected function generateFillableFromJson(array $properties): string + { + $fillable = array_map(fn($name) => " '{$name}'", array_keys($properties)); + return implode(",\n", $fillable); + } + + /** + * Generate casts array from JSON properties + * + * @param array $properties JSON properties + * + * @return string + */ + protected function generateCastsFromJson(array $properties): string + { + $casts = []; + foreach ($properties as $name => $definition) { + $type = $definition['type'] ?? 'string'; + $phpType = $this->mapJsonTypeToPhp($type); + $casts[] = " '{$name}' => '{$phpType}'"; + } + return implode(",\n", $casts); + } + + /** + * Map AVRO type to PHP cast type + * + * @param string $avroType AVRO type + * + * @return string + */ + protected function mapAvroTypeToPhp(string $avroType): string + { + return match ($avroType) { + 'int', 'long' => 'integer', + 'float', 'double' => 'float', + 'boolean' => 'boolean', + 'string' => 'string', + default => 'string', + }; + } + + /** + * Map JSON type to PHP cast type + * + * @param string $jsonType JSON type + * + * @return string + */ + protected function mapJsonTypeToPhp(string $jsonType): string + { + return match ($jsonType) { + 'integer' => 'integer', + 'number' => 'float', + 'boolean' => 'boolean', + 'string' => 'string', + default => 'string', + }; + } + + /** + * Format schema body for output + * + * @return string + */ + protected function formatSchemaBody(): string + { + $schema = json_encode($this->schemaData, JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES); + $lines = explode("\n", $schema); + $indented = array_map(fn($line) => ' ' . $line, $lines); + return implode("\n", $indented); + } + + /** + * Create a collection file for the model. + * + * @return void + */ + protected function createCollection(): void + { + $className = class_basename($this->argument('name')); + $this->call('make:collection', ['name' => $className]); + } + + /** + * Get the default namespace for the class. + * + * @param string $rootNamespace root namespace (generally App) + * + * @return string + */ + protected function getDefaultNamespace($rootNamespace): string + { + return $rootNamespace . '\Models'; + } + + /** + * Get the console command options. + * + * @return array + */ + protected function getOptions(): array + { + return [ + ['type', null, InputOption::VALUE_OPTIONAL, 'Schema type (avro or json)', 'avro'], + ]; + } +} \ No newline at end of file diff --git a/src/Commands/stubs/kafka.schema.avro.stub b/src/Commands/stubs/kafka.schema.avro.stub new file mode 100644 index 0000000..e4f611d --- /dev/null +++ b/src/Commands/stubs/kafka.schema.avro.stub @@ -0,0 +1,85 @@ + + * @license https://mit-license.org/ MIT License + * @version GIT: 0.4.0 + * @link https://github.com/spotlibs + */ + +declare(strict_types=1); + +namespace Spotlibs\PhpLib\Libraries\Kafka; + +use Jobcloud\Kafka\Message\Decoder\DecoderInterface; +use Jobcloud\Kafka\Message\KafkaConsumerMessageInterface; +use Spotlibs\PhpLib\Exceptions\ParameterException; + +/** + * JsonSchemaDecoder + * + * Decodes Kafka consumer messages encoded with JSON Schema Confluent wire format. + * Strips the wire format header (magic byte + schema ID), fetches the schema from + * the registry, validates the JSON payload, and returns the decoded array. + * + * @category Library + * @package Libraries + * @author Mufthi Ryanda + * @license https://mit-license.org/ MIT License + * @link https://github.com/spotlibs + */ +class JsonSchemaDecoder implements DecoderInterface +{ + /** + * JsonSchemaDecoder constructor + * + * @param JsonSchemaRegistry $registry Schema registry client + */ + public function __construct(private JsonSchemaRegistry $registry) + { + } + + /** + * Decode a Kafka consumer message with JSON Schema wire format + * + * @param KafkaConsumerMessageInterface $consumerMessage Incoming Kafka message + * + * @return KafkaConsumerMessageInterface Decoded message with array body + * @throws ParameterException + */ + public function decode(KafkaConsumerMessageInterface $consumerMessage): KafkaConsumerMessageInterface + { + $body = $consumerMessage->getBody(); + + if (!is_string($body) || strlen($body) <= 5) { + return $consumerMessage; + } + + if (ord($body[0]) !== 0) { + return $consumerMessage; + } + + $schemaId = unpack('N', substr($body, 1, 4))[1]; + $jsonPayload = substr($body, 5); + + // Fetch schema for validation + $this->registry->getSchemaById($schemaId); + + $decoded = json_decode($jsonPayload, true); + + if (json_last_error() !== JSON_ERROR_NONE) { + throw new ParameterException('JSON Schema decode failed: invalid JSON payload'); + } + + return new KafkaDecodedMessage($consumerMessage, $decoded); + } + + /** + * Attempt to decode a wire-format message as JSON Schema + * + * Returns null if the payload after the header is not valid JSON, + * indicating it may be Avro-encoded instead. + * + * @param string $body Raw message body bytes + * + * @return array|null Decoded array or null if not valid JSON + */ + public static function tryDecode(string $body): ?array + { + if (strlen($body) <= 5 || ord($body[0]) !== 0) { + return null; + } + + $jsonPayload = substr($body, 5); + $decoded = json_decode($jsonPayload, true); + + if (json_last_error() !== JSON_ERROR_NONE) { + return null; + } + + return $decoded; + } +} diff --git a/src/Libraries/Kafka/JsonSchemaEncoder.php b/src/Libraries/Kafka/JsonSchemaEncoder.php new file mode 100644 index 0000000..bc15529 --- /dev/null +++ b/src/Libraries/Kafka/JsonSchemaEncoder.php @@ -0,0 +1,211 @@ + + * @license https://mit-license.org/ MIT License + * @version GIT: 0.4.0 + * @link https://github.com/spotlibs + */ + +declare(strict_types=1); + +namespace Spotlibs\PhpLib\Libraries\Kafka; + +use Jobcloud\Kafka\Message\Encoder\EncoderInterface; +use Jobcloud\Kafka\Message\KafkaProducerMessageInterface; +use Spotlibs\PhpLib\Exceptions\ParameterException; + +/** + * JsonSchemaEncoder + * + * Encodes Kafka producer messages using JSON Schema with Confluent wire format. + * Registers the schema with the Schema Registry, validates the payload, and + * prepends the wire format header (magic byte + schema ID) to the JSON payload. + * + * @category Library + * @package Libraries + * @author Mufthi Ryanda + * @license https://mit-license.org/ MIT License + * @link https://github.com/spotlibs + */ +class JsonSchemaEncoder implements EncoderInterface +{ + /** + * Confluent wire format magic byte + */ + private const MAGIC_BYTE = "\x00"; + + /** + * JsonSchemaEncoder constructor + * + * @param JsonSchemaRegistry $registry Schema registry client + * @param KafkaJsonSchema $bodySchema Body schema definition + * @param KafkaJsonSchema|null $keySchema Key schema definition (optional) + */ + public function __construct( + private JsonSchemaRegistry $registry, + private KafkaJsonSchema $bodySchema, + private ?KafkaJsonSchema $keySchema = null + ) { + } + + /** + * Encode a Kafka producer message with JSON Schema wire format + * + * @param KafkaProducerMessageInterface $producerMessage Message to encode + * + * @return KafkaProducerMessageInterface Encoded message with wire format body + * @throws ParameterException + */ + public function encode(KafkaProducerMessageInterface $producerMessage): KafkaProducerMessageInterface + { + $body = $producerMessage->getBody(); + $encodedBody = $this->encodePayload($body, $this->bodySchema); + $producerMessage = $producerMessage->withBody($encodedBody); + + if ($this->keySchema !== null && $producerMessage->getKey() !== null) { + $key = $producerMessage->getKey(); + $keyPayload = is_string($key) ? $key : json_encode($key); + $encodedKey = $this->encodeRawPayload($keyPayload, $this->keySchema); + $producerMessage = $producerMessage->withKey($encodedKey); + } + + return $producerMessage; + } + + /** + * Encode a payload value with schema validation and wire format + * + * @param mixed $payload Payload data (array or string) + * @param KafkaJsonSchema $schema Schema to validate and encode against + * + * @return string Wire-format encoded string + * @throws ParameterException + */ + private function encodePayload(mixed $payload, KafkaJsonSchema $schema): string + { + $jsonString = is_string($payload) ? $payload : json_encode($payload); + + $this->validatePayload($jsonString, $schema->getDefinition()); + + return $this->encodeRawPayload($jsonString, $schema); + } + + /** + * Prepend Confluent wire format header to a JSON payload + * + * @param string $jsonPayload JSON string payload + * @param KafkaJsonSchema $schema Schema (must have resolved ID) + * + * @return string Wire-format bytes + * @throws ParameterException + */ + private function encodeRawPayload(string $jsonPayload, KafkaJsonSchema $schema): string + { + $schemaId = $schema->getSchemaId(); + + if ($schemaId === null) { + $schemaId = $this->registry->register($schema->getSubject(), $schema->getDefinition()); + $schema->setSchemaId($schemaId); + } + + return self::MAGIC_BYTE . pack('N', $schemaId) . $jsonPayload; + } + + /** + * Validate a JSON payload against a JSON Schema definition + * + * @param string $jsonPayload JSON string to validate + * @param string $schemaDefinition JSON Schema definition string + * + * @return void + * @throws ParameterException + */ + private function validatePayload(string $jsonPayload, string $schemaDefinition): void + { + $data = json_decode($jsonPayload); + $schema = json_decode($schemaDefinition); + + if ($schema === null) { + throw new ParameterException('Invalid JSON Schema definition'); + } + + if ($data === null && $jsonPayload !== 'null') { + throw new ParameterException('Invalid JSON payload'); + } + + // Validate required fields if schema defines them + if (isset($schema->required) && is_array($schema->required) && is_object($data)) { + foreach ($schema->required as $field) { + if (!property_exists($data, $field)) { + throw new ParameterException("JSON Schema validation failed: missing required field '{$field}'"); + } + } + } + + // Validate property types if schema defines them + if (isset($schema->properties) && is_object($data)) { + foreach ($schema->properties as $prop => $propSchema) { + if (!property_exists($data, $prop)) { + continue; + } + $this->validateType($data->$prop, $propSchema, $prop); + } + } + } + + /** + * Validate a value against its declared JSON Schema type + * + * @param mixed $value Value to check + * @param object $propSchema Property schema definition + * @param string $field Field name for error messages + * + * @return void + * @throws ParameterException + */ + private function validateType(mixed $value, object $propSchema, string $field): void + { + if (!isset($propSchema->type)) { + return; + } + + $types = is_array($propSchema->type) ? $propSchema->type : [$propSchema->type]; + + foreach ($types as $type) { + if ($this->matchesType($value, $type)) { + return; + } + } + + throw new ParameterException( + "JSON Schema validation failed: field '{$field}' does not match type(s) " . json_encode($propSchema->type) + ); + } + + /** + * Check if a value matches a JSON Schema type + * + * @param mixed $value Value to check + * @param string $type JSON Schema type name + * + * @return bool + */ + private function matchesType(mixed $value, string $type): bool + { + return match ($type) { + 'string' => is_string($value), + 'integer' => is_int($value), + 'number' => is_int($value) || is_float($value), + 'boolean' => is_bool($value), + 'array' => is_array($value), + 'object' => is_object($value), + 'null' => $value === null, + default => true, + }; + } +} diff --git a/src/Libraries/Kafka/JsonSchemaRegistry.php b/src/Libraries/Kafka/JsonSchemaRegistry.php new file mode 100644 index 0000000..f14c1e6 --- /dev/null +++ b/src/Libraries/Kafka/JsonSchemaRegistry.php @@ -0,0 +1,139 @@ + + * @license https://mit-license.org/ MIT License + * @version GIT: 0.4.0 + * @link https://github.com/spotlibs + */ + +declare(strict_types=1); + +namespace Spotlibs\PhpLib\Libraries\Kafka; + +use GuzzleHttp\Client as GuzzleClient; +use Spotlibs\PhpLib\Exceptions\ParameterException; + +/** + * JsonSchemaRegistry + * + * Manages JSON Schema registration and retrieval from Confluent Schema Registry. + * Handles the REST API calls for registering schemas under subjects and fetching + * schemas by ID. + * + * @category Library + * @package Libraries + * @author Mufthi Ryanda + * @license https://mit-license.org/ MIT License + * @link https://github.com/spotlibs + */ +class JsonSchemaRegistry +{ + /** + * In-memory cache of schema ID → schema definition + * + * @var array + */ + private array $schemaCache = []; + + /** + * JsonSchemaRegistry constructor + * + * @param GuzzleClient $client Guzzle HTTP client configured with base_uri and auth + */ + public function __construct(private GuzzleClient $client) + { + } + + /** + * Register a JSON Schema under a subject and return the schema ID + * + * @param string $subject Subject name (e.g. "topic-value") + * @param string $definition JSON Schema definition string + * + * @return int The schema ID assigned by the registry + * @throws ParameterException + */ + public function register(string $subject, string $definition): int + { + $payload = json_encode( + [ + 'schemaType' => 'JSON', + 'schema' => $definition, + ] + ); + + try { + $response = $this->client->post( + sprintf('subjects/%s/versions', $subject), + [ + 'headers' => [ + 'Content-Type' => 'application/vnd.schemaregistry.v1+json', + 'Accept' => 'application/vnd.schemaregistry.v1+json', + ], + 'body' => $payload, + ] + ); + + $body = json_decode($response->getBody()->getContents(), true); + + if (!isset($body['id'])) { + throw new ParameterException('Schema Registry did not return a schema ID'); + } + + $schemaId = (int) $body['id']; + $this->schemaCache[$schemaId] = $definition; + + return $schemaId; + } catch (ParameterException $e) { + throw $e; + } catch (\Throwable $e) { + throw new ParameterException('Failed to register JSON Schema: ' . $e->getMessage()); + } + } + + /** + * Fetch a JSON Schema definition by its schema ID + * + * @param int $schemaId Schema ID + * + * @return string JSON Schema definition string + * @throws ParameterException + */ + public function getSchemaById(int $schemaId): string + { + if (isset($this->schemaCache[$schemaId])) { + return $this->schemaCache[$schemaId]; + } + + try { + $response = $this->client->get( + sprintf('schemas/ids/%d', $schemaId), + [ + 'headers' => [ + 'Accept' => 'application/vnd.schemaregistry.v1+json', + ], + ] + ); + + $body = json_decode($response->getBody()->getContents(), true); + + if (!isset($body['schema'])) { + throw new ParameterException("Schema not found for ID {$schemaId}"); + } + + $definition = $body['schema']; + $this->schemaCache[$schemaId] = $definition; + + return $definition; + } catch (ParameterException $e) { + throw $e; + } catch (\Throwable $e) { + throw new ParameterException('Failed to fetch JSON Schema: ' . $e->getMessage()); + } + } +} diff --git a/src/Libraries/Kafka/Kafka.php b/src/Libraries/Kafka/Kafka.php new file mode 100644 index 0000000..529e4c0 --- /dev/null +++ b/src/Libraries/Kafka/Kafka.php @@ -0,0 +1,858 @@ + + * @license https://mit-license.org/ MIT License + * @version GIT: 0.4.0 + * @link https://github.com/spotlibs + */ + +declare(strict_types=1); + +namespace Spotlibs\PhpLib\Libraries\Kafka; + +use AvroSchema; +use FlixTech\AvroSerializer\Objects\RecordSerializer; +use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry; +use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter; +use FlixTech\SchemaRegistryApi\Registry\CachedRegistry; +use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry; +use GuzzleHttp\Client as GuzzleClient; +use Jobcloud\Kafka\Consumer\KafkaConsumerBuilder; +use Jobcloud\Kafka\Consumer\KafkaConsumerBuilderInterface; +use Jobcloud\Kafka\Consumer\KafkaConsumerInterface; +use Jobcloud\Kafka\Message\Decoder\AvroDecoder; +use Jobcloud\Kafka\Message\Encoder\AvroEncoder; +use Jobcloud\Kafka\Message\KafkaAvroSchema; +use Jobcloud\Kafka\Message\KafkaAvroSchemaInterface; +use Jobcloud\Kafka\Message\KafkaConsumerMessageInterface; +use Jobcloud\Kafka\Message\KafkaProducerMessage; +use Jobcloud\Kafka\Message\Registry\AvroSchemaRegistry; +use Jobcloud\Kafka\Producer\KafkaProducer; +use Jobcloud\Kafka\Producer\KafkaProducerBuilder; +use Spotlibs\PhpLib\Exceptions\ParameterException; +use Spotlibs\PhpLib\Logs\Log; + +/** + * Kafka + * + * Kafka producer/consumer client for publishing and consuming messages with + * schema support. Use the Kafka() helper to retrieve an instance. + * + * @category Library + * @package Libraries + * @author Mufthi Ryanda + * @license https://mit-license.org/ MIT License + * @link https://github.com/spotlibs + */ +class Kafka +{ + /** + * Schema type: No schema, no encoding (raw data) + */ + public const SCHEMALESS = 1; + + /** + * Schema type: JSON schema with validation and encoding + */ + public const JSON_SCHEMA = 2; + + /** + * Schema type: Avro schema with registry and encoding + */ + public const AVRO_SCHEMA = 3; + + /** + * Schema type: Auto-detect -> Avro if magic byte (\x00) present, else raw + */ + public const AUTO_SCHEMA = 4; + + + /** + * Current producer instance + * + * @var KafkaProducer|null + */ + private ?KafkaProducer $producer = null; + + /** + * Current consumer instance + * + * @var KafkaConsumerInterface|null + */ + private ?KafkaConsumerInterface $consumer = null; + + /** + * Current topic name + * + * @var string|null + */ + private ?string $currentTopic = null; + + /** + * Setup and return Kafka producer + * + * @param string $topic Topic name + * @param int $schemaType Schema type constant + * @param string|null $schemaBody Schema definition for message body + * @param string|null $schemaKey Schema definition for message key + * @param array $additionalConfig Additional Kafka configuration + * + * @return KafkaProducer + */ + public function publishOn( + string $topic, + int $schemaType, + ?string $schemaBody = null, + ?string $schemaKey = null, + array $additionalConfig = [] + ): KafkaProducer { + $this->validateEnvironment(); + $this->currentTopic = $topic; + + $producerBuilder = $this->createProducerBuilder($additionalConfig); + + if ($schemaType === self::AVRO_SCHEMA) { + $encoder = $this->createAvroEncoder($topic, $schemaBody, $schemaKey); + $producerBuilder = $producerBuilder->withEncoder($encoder); + } elseif ($schemaType === self::JSON_SCHEMA) { + $encoder = $this->createJsonSchemaEncoder($topic, $schemaBody, $schemaKey); + $producerBuilder = $producerBuilder->withEncoder($encoder); + } + + $this->producer = $producerBuilder->build(); + + Log::runtime()->info( + [ + 'operation' => 'kafka_producer_initialized', + 'topic' => $topic, + 'schemaType' => $schemaType + ] + ); + + return $this->producer; + } + + /** + * Produce a message to the current topic + * + * @param mixed $body Message body + * @param string|null $key Message key for partitioning + * @param int $partition Partition number (default 0) + * + * @return void + * @throws ParameterException + */ + public function produce(mixed $body, ?string $key = null, int $partition = 0): void + { + $this->ensureProducerInitialized(); + + $message = KafkaProducerMessage::create($this->currentTopic, $partition) + ->withBody($body); + + if ($key !== null) { + $message = $message->withKey($key); + } + + $this->producer->produce($message); + } + + /** + * Produce a message with custom headers + * + * @param mixed $body Message body + * @param array $headers Message headers (key-value pairs) + * @param string|null $key Message key for partitioning + * @param int $partition Partition number (default 0) + * + * @return void + * @throws ParameterException + */ + public function produceWithHeaders( + mixed $body, + array $headers, + ?string $key = null, + int $partition = 0 + ): void { + $this->ensureProducerInitialized(); + + $message = KafkaProducerMessage::create($this->currentTopic, $partition) + ->withBody($body) + ->withHeaders($headers); + + if ($key !== null) { + $message = $message->withKey($key); + } + + $this->producer->produce($message); + } + + /** + * Produce multiple messages in batch + * + * @param array $messages Array of messages with format: ['body' => mixed, 'key' => ?string, 'partition' => int] + * + * @return void + * @throws ParameterException + */ + public function produceBatch(array $messages): void + { + $this->ensureProducerInitialized(); + + foreach ($messages as $msg) { + $body = $msg['body'] ?? null; + $key = $msg['key'] ?? null; + $partition = $msg['partition'] ?? 0; + + if ($body === null) { + continue; + } + + $this->produce($body, $key, $partition); + } + } + + /** + * Flush all queued messages to Kafka + * + * @param int $timeoutMs Timeout in milliseconds (default 10000) + * + * @return void + * @throws ParameterException + */ + public function flush(int $timeoutMs = 10000): void + { + $this->ensureProducerInitialized(); + + $startTime = microtime(true); + $this->producer->flush($timeoutMs); + $elapsed = microtime(true) - $startTime; + + Log::runtime()->info( + [ + 'operation' => 'kafka_flush', + 'topic' => $this->currentTopic, + 'responseTime' => round($elapsed * 1000) + ] + ); + } + + /** + * Close producer and cleanup resources + * + * @param int $timeoutMs Timeout in milliseconds to flush remaining messages (default 10000) + * + * @return void + */ + public function close(int $timeoutMs = 10000): void + { + if ($this->producer !== null) { + $this->flush($timeoutMs); + $this->producer = null; + $this->currentTopic = null; + + Log::runtime()->info( + [ + 'operation' => 'kafka_producer_closed' + ] + ); + } + } + + /** + * Convenience method: publish single message immediately + * + * @param string $topic Topic name + * @param mixed $body Message body + * @param int $schemaType Schema type constant (default SCHEMALESS) + * @param string|null $schemaBody Schema definition for body + * @param string|null $schemaKey Schema definition for key + * @param string|null $key Message key + * @param int $partition Partition number (default 0) + * @param int $flushTimeoutMs Flush timeout in milliseconds (default 10000) + * + * @return void + * @throws ParameterException + */ + public function publish( + string $topic, + mixed $body, + int $schemaType = self::SCHEMALESS, + ?string $schemaBody = null, + ?string $schemaKey = null, + ?string $key = null, + int $partition = 0, + int $flushTimeoutMs = 10000 + ): void { + $this->publishOn($topic, $schemaType, $schemaBody, $schemaKey); + $this->produce($body, $key, $partition); + $this->flush($flushTimeoutMs); + } + + /** + * Validate required environment variables + * + * @return void + * @throws ParameterException + */ + private function validateEnvironment(): void + { + $required = [ + 'KAFKA_BROKERS_URL' => 'Kafka brokers URL', + 'KAFKA_USER_PRODUCE' => 'Kafka producer username', + 'KAFKA_PASS_PRODUCE' => 'Kafka producer password' + ]; + + foreach ($required as $env => $description) { + if (empty(env($env))) { + throw new ParameterException("Environment variable {$env} ({$description}) is not set"); + } + } + } + + /** + * Create producer builder with default configuration + * + * @param array $additionalConfig Additional configuration + * + * @return KafkaProducerBuilder + */ + private function createProducerBuilder(array $additionalConfig): KafkaProducerBuilder + { + $defaultConfig = [ + 'compression.codec' => 'lz4', + 'sasl.username' => env('KAFKA_USER_PRODUCE'), + 'sasl.password' => env('KAFKA_PASS_PRODUCE'), + 'sasl.mechanism' => 'PLAIN', + 'security.protocol' => 'SASL_SSL', + 'message.timeout.ms' => '10000', + 'socket.timeout.ms' => '10000' + ]; + + $config = array_merge($defaultConfig, $additionalConfig); + + return KafkaProducerBuilder::create() + ->withAdditionalConfig($config) + ->withAdditionalBroker(env('KAFKA_BROKERS_URL')) + ->withDeliveryReportCallback([$this, 'deliveryReportCallback']) + ->withErrorCallback([$this, 'errorCallback']) + ->withLogCallback([$this, 'logCallback']); + } + + /** + * Create Avro encoder with schema registry + * + * @param string $topic Topic name + * @param string|null $schemaBody Body schema definition + * @param string|null $schemaKey Key schema definition + * + * @return AvroEncoder + * @throws ParameterException + */ + private function createAvroEncoder( + string $topic, + ?string $schemaBody, + ?string $schemaKey + ): AvroEncoder { + if (empty(env('KAFKA_SCHEME_REGISTRY_URL'))) { + throw new ParameterException('Environment variable KAFKA_SCHEME_REGISTRY_URL is not set'); + } + + $cachedRegistry = new CachedRegistry( + new BlockingRegistry( + new PromisingRegistry( + new GuzzleClient( + [ + 'base_uri' => env('KAFKA_SCHEME_REGISTRY_URL'), + 'auth' => [env('KAFKA_USER_PRODUCE'), env('KAFKA_PASS_PRODUCE')] + ] + ) + ) + ), + new AvroObjectCacheAdapter() + ); + + $registry = new AvroSchemaRegistry($cachedRegistry); + $recordSerializer = new RecordSerializer( + $cachedRegistry, + [ + RecordSerializer::OPTION_REGISTER_MISSING_SUBJECTS => true, + RecordSerializer::OPTION_REGISTER_MISSING_SCHEMAS => true, + ] + ); + + if ($schemaBody !== null) { + $registry->addBodySchemaMappingForTopic( + $topic, + new KafkaAvroSchema( + $topic . '-value', + KafkaAvroSchemaInterface::LATEST_VERSION, + AvroSchema::parse($schemaBody) + ) + ); + } + + if ($schemaKey !== null) { + $registry->addKeySchemaMappingForTopic( + $topic, + new KafkaAvroSchema( + $topic . '-key', + KafkaAvroSchemaInterface::LATEST_VERSION, + AvroSchema::parse($schemaKey) + ) + ); + } + + return new AvroEncoder($registry, $recordSerializer); + } + + /** + * Create JSON Schema encoder with schema registry + * + * @param string $topic Topic name + * @param string|null $schemaBody Body schema definition (JSON Schema string) + * @param string|null $schemaKey Key schema definition (JSON Schema string) + * + * @return JsonSchemaEncoder + * @throws ParameterException + */ + private function createJsonSchemaEncoder( + string $topic, + ?string $schemaBody, + ?string $schemaKey + ): JsonSchemaEncoder { + if (empty(env('KAFKA_SCHEME_REGISTRY_URL'))) { + throw new ParameterException('Environment variable KAFKA_SCHEME_REGISTRY_URL is not set'); + } + + if ($schemaBody === null) { + throw new ParameterException('JSON Schema body definition is required for JSON_SCHEMA mode'); + } + + $registry = new JsonSchemaRegistry( + new GuzzleClient( + [ + 'base_uri' => env('KAFKA_SCHEME_REGISTRY_URL'), + 'auth' => [env('KAFKA_USER_PRODUCE'), env('KAFKA_PASS_PRODUCE')] + ] + ) + ); + + $bodySchema = new KafkaJsonSchema($topic . '-value', $schemaBody); + + $keySchema = null; + if ($schemaKey !== null) { + $keySchema = new KafkaJsonSchema($topic . '-key', $schemaKey); + } + + return new JsonSchemaEncoder($registry, $bodySchema, $keySchema); + } + + /** + * Create JSON Schema decoder with schema registry + * + * @return JsonSchemaDecoder + * @throws ParameterException + */ + private function createJsonSchemaDecoder(): JsonSchemaDecoder + { + if (empty(env('KAFKA_SCHEME_REGISTRY_URL'))) { + throw new ParameterException('Environment variable KAFKA_SCHEME_REGISTRY_URL is not set'); + } + + $registry = new JsonSchemaRegistry( + new GuzzleClient( + [ + 'base_uri' => env('KAFKA_SCHEME_REGISTRY_URL'), + 'auth' => [env('KAFKA_USER_CONSUME'), env('KAFKA_PASS_CONSUME')] + ] + ) + ); + + return new JsonSchemaDecoder($registry); + } + + /** + * Ensure producer is initialized + * + * @return void + * @throws ParameterException + */ + private function ensureProducerInitialized(): void + { + if ($this->producer === null) { + throw new ParameterException('Producer not initialized. Call publishOn() first.'); + } + } + + /** + * Delivery report callback + * + * @param mixed $kafka Kafka instance + * @param mixed $message Message + * + * @return void + */ + public function deliveryReportCallback(mixed $kafka, mixed $message): void + { + if ($message->err !== 0) { + Log::runtime()->error( + [ + 'operation' => 'kafka_delivery_failed', + 'error' => $message->errstr(), + 'topic' => $message->topic_name, + 'partition' => $message->partition + ] + ); + } + } + + /** + * Error callback + * + * @param mixed $kafka Kafka instance + * @param int $err Error code + * @param string $reason Error reason + * + * @return void + */ + public function errorCallback(mixed $kafka, int $err, string $reason): void + { + Log::runtime()->error( + [ + 'operation' => 'kafka_producer_error', + 'errorCode' => $err, + 'reason' => $reason + ] + ); + } + + /** + * Log callback + * + * @param mixed $kafka Kafka instance + * @param int $level Log level + * @param string $facility Facility + * @param string $message Log message + * + * @return void + */ + public function logCallback(mixed $kafka, int $level, string $facility, string $message): void + { + if ($level <= 3) { + Log::runtime()->warning( + [ + 'operation' => 'kafka_producer_log', + 'level' => $level, + 'facility' => $facility, + 'message' => $message + ] + ); + } + } + + /** + * Setup and return Kafka consumer + * + * @param string $topic Topic name + * @param int $schemaType Schema type constant + * @param string|null $consumerGroup Consumer group name + * @param array $additionalConfig Additional Kafka configuration + * + * @return KafkaConsumerInterface + * @throws ParameterException + * @throws \AvroIOException + */ + public function consumeOn( + string $topic, + int $schemaType, + ?string $consumerGroup = null, + array $additionalConfig = [] + ): KafkaConsumerInterface { + $this->validateConsumerEnvironment(); + $this->currentTopic = $topic; + + $consumerBuilder = $this->createConsumerBuilder($topic, $consumerGroup, $additionalConfig); + + if ($schemaType === self::AVRO_SCHEMA) { + $decoder = $this->createAvroDecoder($topic); + $consumerBuilder = $consumerBuilder->withDecoder($decoder); + } elseif ($schemaType === self::JSON_SCHEMA) { + $decoder = $this->createJsonSchemaDecoder(); + $consumerBuilder = $consumerBuilder->withDecoder($decoder); + } elseif ($schemaType === self::AUTO_SCHEMA) { + $avroDecoder = $this->createAvroDecoder($topic); + $jsonSchemaDecoder = $this->createJsonSchemaDecoder(); + $consumerBuilder = $consumerBuilder->withDecoder(new KafkaAutoDecoder($avroDecoder, $jsonSchemaDecoder)); + } + + $this->consumer = $consumerBuilder->build(); + $this->consumer->subscribe(); + + Log::runtime()->info( + [ + 'operation' => 'kafka_consumer_initialized', + 'topic' => $topic, + 'consumerGroup' => $consumerGroup, + 'schemaType' => $schemaType + ] + ); + + return $this->consumer; + } + + /** + * Consume single message + * + * @param int $timeoutMs Timeout in milliseconds + * @param ?string $topic Topic + * + * @return KafkaConsumerMessageInterface + */ + public function consume(int $timeoutMs = 10000, ?string $topic = null): KafkaConsumerMessageInterface + { + if ($this->consumer === null) { + if ($topic === null) { + throw new ParameterException('Consumer not initialized. Call consumeOn() first or provide a topic.'); + } + // auto-bind with AUTO_SCHEMA + $this->consumeOn($topic, self::AUTO_SCHEMA); + } + + return $this->consumer->consume($timeoutMs); + } + + /** + * Commit message offset + * + * @param mixed $message Message to commit + * + * @return void + * @throws ParameterException + */ + public function commit(mixed $message): void + { + if ($this->consumer === null) { + throw new ParameterException('Consumer not initialized. Call consumeOn() first.'); + } + + $this->consumer->commit($message); + } + + /** + * Validate required consumer environment variables + * + * @return void + * @throws ParameterException + */ + private function validateConsumerEnvironment(): void + { + $required = [ + 'KAFKA_BROKERS_URL' => 'Kafka brokers URL', + 'KAFKA_USER_CONSUME' => 'Kafka consumer username', + 'KAFKA_PASS_CONSUME' => 'Kafka consumer password' + ]; + + foreach ($required as $env => $description) { + if (empty(env($env))) { + throw new ParameterException("Environment variable {$env} ({$description}) is not set"); + } + } + } + + /** + * Subscribe to topics + * + * @return void + * @throws ParameterException + */ + public function subscribe(): void + { + if ($this->consumer === null) { + throw new ParameterException('Consumer not initialized. Call consumeOn() first.'); + } + + $this->consumer->subscribe(); + + Log::runtime()->info( + [ + 'operation' => 'kafka_consumer_subscribed', + 'topic' => $this->currentTopic + ] + ); + } + + /** + * Close consumer and cleanup resources + * + * @return void + */ + public function closeConsumer(): void + { + if ($this->consumer !== null) { + $this->consumer->unsubscribe(); + $this->consumer = null; + $this->currentTopic = null; + + Log::runtime()->info( + [ + 'operation' => 'kafka_consumer_closed' + ] + ); + } + } + + /** + * Create consumer builder with default configuration + * + * @param string $topic Topic name + * @param string|null $consumerGroup Consumer group + * @param array $additionalConfig Additional configuration + * + * @return KafkaConsumerBuilderInterface + */ + private function createConsumerBuilder( + string $topic, + ?string $consumerGroup, + array $additionalConfig + ): KafkaConsumerBuilderInterface { + $groupName = $consumerGroup ?? $topic . '_consumer_group'; + + $defaultConfig = [ + 'client.id' => env('APP_NAME') . '-' . gethostname(), + 'compression.codec' => 'lz4', + 'sasl.username' => env('KAFKA_USER_CONSUME'), + 'sasl.password' => env('KAFKA_PASS_CONSUME'), + 'sasl.mechanism' => 'PLAIN', + 'security.protocol' => 'SASL_SSL', + 'enable.auto.commit' => false, + 'message.timeout.ms' => '10000', + 'socket.timeout.ms' => '10000', + 'request.timeout.ms' => '60000' + ]; + + $config = array_merge($defaultConfig, $additionalConfig); + + return KafkaConsumerBuilder::create() + ->withAdditionalConfig($config) + ->withAdditionalBroker((string) env('KAFKA_BROKERS_URL')) + ->withConsumerGroup($groupName) + ->withAdditionalSubscription($topic, [], KafkaConsumerBuilderInterface::OFFSET_STORED) + ->withErrorCallback([$this, 'errorCallback']) + ->withRebalanceCallback([$this, 'rebalanceCallback']) + ->withConsumeCallback([$this, 'consumeCallback']) + ->withLogCallback([$this, 'logCallback']) + ->withOffsetCommitCallback([$this, 'offsetCommitCallback']); + } + + /** + * Create Avro decoder with schema registry + * + * @param string $topic Topic name + * + * @return AvroDecoder + * @throws ParameterException + * @throws \AvroIOException + */ + private function createAvroDecoder(string $topic): AvroDecoder + { + if (empty(env('KAFKA_SCHEME_REGISTRY_URL'))) { + throw new ParameterException('Environment variable KAFKA_SCHEME_REGISTRY_URL is not set'); + } + + $cachedRegistry = new CachedRegistry( + new BlockingRegistry( + new PromisingRegistry( + new GuzzleClient( + [ + 'base_uri' => env('KAFKA_SCHEME_REGISTRY_URL'), + 'auth' => [env('KAFKA_USER_CONSUME'), env('KAFKA_PASS_CONSUME')] + ] + ) + ) + ), + new AvroObjectCacheAdapter() + ); + + $registry = new AvroSchemaRegistry($cachedRegistry); + $recordSerializer = new RecordSerializer($cachedRegistry); + + // Fetch schemas from registry automatically + $registry->addBodySchemaMappingForTopic( + $topic, + new KafkaAvroSchema( + $topic . '-value', + KafkaAvroSchemaInterface::LATEST_VERSION + ) + ); + $registry->addKeySchemaMappingForTopic( + $topic, + new KafkaAvroSchema( + $topic . '-key', + KafkaAvroSchemaInterface::LATEST_VERSION + ) + ); + + return new AvroDecoder($registry, $recordSerializer); + } + + /** + * Rebalance callback + * + * @param mixed $kafka Kafka instance + * @param int $err Error code + * @param array $partitions Partitions + * + * @return void + */ + public function rebalanceCallback(mixed $kafka, int $err, array $partitions): void + { + $assignPartitions = defined('RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS') + ? RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS // Defined on C-level constant, comes from the `ext-rdkafka` + : -175; + + if ($err === $assignPartitions) { + $kafka->assign($partitions); + } else { + $kafka->assign(null); + } + } + + /** + * Consume callback + * + * @param mixed $message Message + * @param mixed $ctx Context + * + * @return void + */ + public function consumeCallback(mixed $message, mixed $ctx): void + { + // no-op: messages are handled by the caller via consume() + } + + /** + * Offset commit callback + * + * @param mixed $kafka Kafka instance + * @param int $err Error code + * @param array $topicPartitions Topic partitions + * + * @return void + */ + public function offsetCommitCallback(mixed $kafka, int $err, array $topicPartitions): void + { + if ($err !== 0) { + Log::runtime()->error( + [ + 'operation' => 'kafka_offset_commit_failed', + 'errorCode' => $err + ] + ); + } + } +} diff --git a/src/Libraries/Kafka/KafkaAutoDecoder.php b/src/Libraries/Kafka/KafkaAutoDecoder.php new file mode 100644 index 0000000..6fe39ba --- /dev/null +++ b/src/Libraries/Kafka/KafkaAutoDecoder.php @@ -0,0 +1,105 @@ + + * @license https://mit-license.org/ MIT License + * @version GIT: 0.4.0 + * @link https://github.com/spotlibs + */ + +declare(strict_types=1); + +namespace Spotlibs\PhpLib\Libraries\Kafka; + +use Jobcloud\Kafka\Message\Decoder\AvroDecoder; +use Jobcloud\Kafka\Message\Decoder\DecoderInterface; +use Jobcloud\Kafka\Message\KafkaConsumerMessageInterface; + +/** + * KafkaAutoDecoder + * + * Smart decoder that auto-detects message encoding. When the message starts with + * the Confluent wire format magic byte (0x00), it first attempts JSON Schema + * decoding (payload bytes 5+ are valid JSON), then falls back to Avro decoding. + * Messages without the magic byte are attempted as plain JSON, or returned raw. + * + * @category Library + * @package Libraries + * @author Mufthi Ryanda + * @license https://mit-license.org/ MIT License + * @link https://github.com/spotlibs + */ +class KafkaAutoDecoder implements DecoderInterface +{ + /** + * KafkaAutoDecoder constructor + * + * @param AvroDecoder $avroDecoder Avro decoder instance backed by schema registry + * @param JsonSchemaDecoder|null $jsonSchemaDecoder JSON Schema decoder instance (optional) + */ + public function __construct( + private AvroDecoder $avroDecoder, + private ?JsonSchemaDecoder $jsonSchemaDecoder = null + ) { + } + + /** + * Decode a Kafka consumer message + * + * Attempts decoding in this order for wire-format messages (magic byte 0x00): + * 1. JSON Schema (payload after header is valid JSON text) + * 2. Avro (binary payload after header) + * + * For non-wire-format messages: + * 3. Plain JSON (starts with { or [) + * 4. Raw passthrough + * + * @param KafkaConsumerMessageInterface $consumerMessage Incoming Kafka message + * + * @return KafkaConsumerMessageInterface + */ + public function decode(KafkaConsumerMessageInterface $consumerMessage): KafkaConsumerMessageInterface + { + $body = $consumerMessage->getBody(); + + if (is_string($body) && strlen($body) > 5) { + // Confluent wire format: magic byte 0x00 + if (ord($body[0]) === 0) { + // Try JSON Schema first (payload is UTF-8 JSON text) + $jsonDecoded = JsonSchemaDecoder::tryDecode($body); + if ($jsonDecoded !== null) { + if ($this->jsonSchemaDecoder !== null) { + try { + return $this->jsonSchemaDecoder->decode($consumerMessage); + } catch (\Throwable) { + // Fall through to return basic decoded + } + } + return new KafkaDecodedMessage($consumerMessage, $jsonDecoded); + } + + // Fall back to Avro (binary payload) + try { + return $this->avroDecoder->decode($consumerMessage); + } catch (\Throwable) { + // fallback to raw + } + } + + // Plain JSON: starts with { or [ + $firstChar = ltrim($body)[0] ?? ''; + if ($firstChar === '{' || $firstChar === '[') { + $decoded = json_decode($body, true); + if (json_last_error() === JSON_ERROR_NONE) { + return new KafkaDecodedMessage($consumerMessage, $decoded); + } + } + } + + return $consumerMessage; + } +} diff --git a/src/Libraries/Kafka/KafkaDecodedMessage.php b/src/Libraries/Kafka/KafkaDecodedMessage.php new file mode 100644 index 0000000..6288852 --- /dev/null +++ b/src/Libraries/Kafka/KafkaDecodedMessage.php @@ -0,0 +1,116 @@ + + * @license https://mit-license.org/ MIT License + * @version GIT: 0.4.0 + * @link https://github.com/spotlibs + */ + +declare(strict_types=1); + +namespace Spotlibs\PhpLib\Libraries\Kafka; + +use Jobcloud\Kafka\Message\KafkaConsumerMessageInterface; + +/** + * KafkaDecodedMessage + * + * Wraps a raw KafkaConsumerMessage and replaces its body with an already-decoded + * value (e.g. an associative array from JSON). All other accessors are delegated + * to the original message to preserve topic, partition, offset, and header info. + * + * @category Library + * @package Libraries + * @author Mufthi Ryanda + * @license https://mit-license.org/ MIT License + * @link https://github.com/spotlibs + */ +class KafkaDecodedMessage implements KafkaConsumerMessageInterface +{ + /** + * KafkaDecodedMessage constructor + * + * @param KafkaConsumerMessageInterface $original Original raw Kafka message + * @param mixed $decodedBody Pre-decoded message body + */ + public function __construct( + private KafkaConsumerMessageInterface $original, + private mixed $decodedBody + ) { + } + + /** + * Get the decoded message body + * + * @return mixed + */ + public function getBody(): mixed + { + return $this->decodedBody; + } + + /** + * Get the message key + * + * @return string|null + */ + public function getKey(): ?string + { + return $this->original->getKey(); + } + + /** + * Get the topic name + * + * @return string + */ + public function getTopicName(): string + { + return $this->original->getTopicName(); + } + + /** + * Get the partition number + * + * @return int + */ + public function getPartition(): int + { + return $this->original->getPartition(); + } + + /** + * Get the message headers + * + * @return array + */ + public function getHeaders(): array + { + return $this->original->getHeaders(); + } + + /** + * Get the message offset + * + * @return int + */ + public function getOffset(): int + { + return $this->original->getOffset(); + } + + /** + * Get the message timestamp + * + * @return int + */ + public function getTimestamp(): int + { + return $this->original->getTimestamp(); + } +} diff --git a/src/Libraries/Kafka/KafkaJsonSchema.php b/src/Libraries/Kafka/KafkaJsonSchema.php new file mode 100644 index 0000000..9dec190 --- /dev/null +++ b/src/Libraries/Kafka/KafkaJsonSchema.php @@ -0,0 +1,92 @@ + + * @license https://mit-license.org/ MIT License + * @version GIT: 0.4.0 + * @link https://github.com/spotlibs + */ + +declare(strict_types=1); + +namespace Spotlibs\PhpLib\Libraries\Kafka; + +/** + * KafkaJsonSchema + * + * Holds a JSON Schema definition with its subject name and resolved schema ID + * from the Confluent Schema Registry. + * + * @category Library + * @package Libraries + * @author Mufthi Ryanda + * @license https://mit-license.org/ MIT License + * @link https://github.com/spotlibs + */ +class KafkaJsonSchema +{ + /** + * Schema ID from registry (null until registered/fetched) + * + * @var int|null + */ + private ?int $schemaId = null; + + /** + * KafkaJsonSchema constructor + * + * @param string $subject Subject name in the registry (e.g. "topic-value") + * @param string $definition Raw JSON Schema definition string + */ + public function __construct( + private string $subject, + private string $definition + ) { + } + + /** + * Get the subject name + * + * @return string + */ + public function getSubject(): string + { + return $this->subject; + } + + /** + * Get the JSON Schema definition string + * + * @return string + */ + public function getDefinition(): string + { + return $this->definition; + } + + /** + * Get the resolved schema ID + * + * @return int|null + */ + public function getSchemaId(): ?int + { + return $this->schemaId; + } + + /** + * Set the resolved schema ID + * + * @param int $schemaId Schema ID from registry + * + * @return void + */ + public function setSchemaId(int $schemaId): void + { + $this->schemaId = $schemaId; + } +} diff --git a/tests/Libraries/ClientExternalTest.php b/tests/Libraries/ClientExternalTest.php index 53fae2f..df83f8d 100644 --- a/tests/Libraries/ClientExternalTest.php +++ b/tests/Libraries/ClientExternalTest.php @@ -11,7 +11,6 @@ use GuzzleHttp\Psr7\Request; use GuzzleHttp\Psr7\Response; use GuzzleHttp\Psr7\Utils; -use Illuminate\Support\Facades\Redis; use Laravel\Lumen\Testing\TestCase; use Spotlibs\PhpLib\Libraries\ClientExternal; use GuzzleHttp\Handler\MockHandler; @@ -46,12 +45,16 @@ public function testCallEksternal1(): void public function testCallExternalMultipartSuccess(): void { - Redis::shouldReceive('get')->andReturn(json_encode([ + $redisMock = \Mockery::mock(); + $redisMock->shouldReceive('get')->andReturn(json_encode([ 'id' => 1, 'target_url' => 'https://jsonplaceholder.typicode.com/posts', 'mock_url' => 'https://jsonplaceholder.typicode.com/posts', 'flag' => true, ])); + $this->app->singleton('redis', function () use ($redisMock) { + return $redisMock; + }); $f = fopen('public/docs/hello.txt', 'w'); fwrite($f, 'hello world'); fclose($f); diff --git a/tests/Libraries/Kafka/JsonSchemaDecoderTest.php b/tests/Libraries/Kafka/JsonSchemaDecoderTest.php new file mode 100644 index 0000000..599475f --- /dev/null +++ b/tests/Libraries/Kafka/JsonSchemaDecoderTest.php @@ -0,0 +1,154 @@ + + * @license https://mit-license.org/ MIT License + * @version GIT: 0.0.1 + * @link https://github.com/ + */ + +declare(strict_types=1); + +namespace Tests\Libraries\Kafka; + +use Jobcloud\Kafka\Message\KafkaConsumerMessageInterface; +use Mockery; +use PHPUnit\Framework\TestCase; +use Spotlibs\PhpLib\Exceptions\ParameterException; +use Spotlibs\PhpLib\Libraries\Kafka\JsonSchemaDecoder; +use Spotlibs\PhpLib\Libraries\Kafka\JsonSchemaRegistry; +use Spotlibs\PhpLib\Libraries\Kafka\KafkaDecodedMessage; + +/** + * JsonSchemaDecoderTest + * + * Unit test for JsonSchemaDecoder + * + * @category Test + * @package Tests\Libraries\Kafka + * @author Mufthi Ryanda + * @license https://mit-license.org/ MIT License + * @link https://github.com/ + * @covers \Spotlibs\PhpLib\Libraries\Kafka\JsonSchemaDecoder + * @covers \Spotlibs\PhpLib\Libraries\Kafka\KafkaDecodedMessage + * @covers \Spotlibs\PhpLib\Exceptions\ParameterException + */ +class JsonSchemaDecoderTest extends TestCase +{ + protected function tearDown(): void + { + Mockery::close(); + parent::tearDown(); + } + + /** @test */ + public function testDecodeReturnsOriginalWhenBodyNotString(): void + { + $registry = Mockery::mock(JsonSchemaRegistry::class); + $msg = Mockery::mock(KafkaConsumerMessageInterface::class); + $msg->shouldReceive('getBody')->once()->andReturn(null); + + $decoder = new JsonSchemaDecoder($registry); + $result = $decoder->decode($msg); + + $this->assertSame($msg, $result); + } + + /** @test */ + public function testDecodeReturnsOriginalWhenBodyTooShort(): void + { + $registry = Mockery::mock(JsonSchemaRegistry::class); + $msg = Mockery::mock(KafkaConsumerMessageInterface::class); + $msg->shouldReceive('getBody')->once()->andReturn('abc'); + + $decoder = new JsonSchemaDecoder($registry); + $result = $decoder->decode($msg); + + $this->assertSame($msg, $result); + } + + /** @test */ + public function testDecodeReturnsOriginalWhenNoMagicByte(): void + { + $registry = Mockery::mock(JsonSchemaRegistry::class); + $body = 'X' . pack('N', 1) . '{"a":1}'; + $msg = Mockery::mock(KafkaConsumerMessageInterface::class); + $msg->shouldReceive('getBody')->once()->andReturn($body); + + $decoder = new JsonSchemaDecoder($registry); + $result = $decoder->decode($msg); + + $this->assertSame($msg, $result); + } + + /** @test */ + public function testDecodeSuccess(): void + { + $registry = Mockery::mock(JsonSchemaRegistry::class); + $registry->shouldReceive('getSchemaById')->once()->with(1)->andReturn('{}'); + + $payload = json_encode(['name' => 'test']); + $body = "\x00" . pack('N', 1) . $payload; + + $msg = Mockery::mock(KafkaConsumerMessageInterface::class); + $msg->shouldReceive('getBody')->once()->andReturn($body); + + $decoder = new JsonSchemaDecoder($registry); + $result = $decoder->decode($msg); + + $this->assertInstanceOf(KafkaDecodedMessage::class, $result); + $this->assertSame(['name' => 'test'], $result->getBody()); + } + + /** @test */ + public function testDecodeThrowsOnInvalidJson(): void + { + $this->expectException(ParameterException::class); + + $registry = Mockery::mock(JsonSchemaRegistry::class); + $registry->shouldReceive('getSchemaById')->once()->andReturn('{}'); + + $body = "\x00" . pack('N', 1) . '{invalid json'; + + $msg = Mockery::mock(KafkaConsumerMessageInterface::class); + $msg->shouldReceive('getBody')->once()->andReturn($body); + + $decoder = new JsonSchemaDecoder($registry); + $decoder->decode($msg); + } + + /** @test */ + public function testTryDecodeReturnsNullWhenTooShort(): void + { + $result = JsonSchemaDecoder::tryDecode('abc'); + $this->assertNull($result); + } + + /** @test */ + public function testTryDecodeReturnsNullWhenNoMagicByte(): void + { + $body = 'X' . pack('N', 1) . '{"a":1}'; + $result = JsonSchemaDecoder::tryDecode($body); + $this->assertNull($result); + } + + /** @test */ + public function testTryDecodeReturnsNullOnInvalidJson(): void + { + $body = "\x00" . pack('N', 1) . '{not json'; + $result = JsonSchemaDecoder::tryDecode($body); + $this->assertNull($result); + } + + /** @test */ + public function testTryDecodeReturnsArrayOnValidJson(): void + { + $body = "\x00" . pack('N', 1) . json_encode(['foo' => 'bar']); + $result = JsonSchemaDecoder::tryDecode($body); + $this->assertSame(['foo' => 'bar'], $result); + } +} diff --git a/tests/Libraries/Kafka/JsonSchemaEncoderTest.php b/tests/Libraries/Kafka/JsonSchemaEncoderTest.php new file mode 100644 index 0000000..5f18bdf --- /dev/null +++ b/tests/Libraries/Kafka/JsonSchemaEncoderTest.php @@ -0,0 +1,259 @@ + + * @license https://mit-license.org/ MIT License + * @version GIT: 0.0.1 + * @link https://github.com/ + */ + +declare(strict_types=1); + +namespace Tests\Libraries\Kafka; + +use Jobcloud\Kafka\Message\KafkaProducerMessageInterface; +use Mockery; +use PHPUnit\Framework\TestCase; +use Spotlibs\PhpLib\Exceptions\ParameterException; +use Spotlibs\PhpLib\Libraries\Kafka\JsonSchemaEncoder; +use Spotlibs\PhpLib\Libraries\Kafka\JsonSchemaRegistry; +use Spotlibs\PhpLib\Libraries\Kafka\KafkaJsonSchema; + +/** + * JsonSchemaEncoderTest + * + * Unit test for JsonSchemaEncoder + * + * @category Test + * @package Tests\Libraries\Kafka + * @author Mufthi Ryanda + * @license https://mit-license.org/ MIT License + * @link https://github.com/ + * @covers \Spotlibs\PhpLib\Libraries\Kafka\JsonSchemaEncoder + * @covers \Spotlibs\PhpLib\Libraries\Kafka\KafkaJsonSchema + * @covers \Spotlibs\PhpLib\Exceptions\ParameterException + */ +class JsonSchemaEncoderTest extends TestCase +{ + protected function tearDown(): void + { + Mockery::close(); + parent::tearDown(); + } + + /** @test */ + public function testEncodeBodyWithRegisteredSchema(): void + { + $registry = Mockery::mock(JsonSchemaRegistry::class); + $registry->shouldReceive('register')->once()->andReturn(7); + + $bodySchema = new KafkaJsonSchema('topic-value', '{"type":"object"}'); + + $msg = Mockery::mock(KafkaProducerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn(['name' => 'test']); + $msg->shouldReceive('withBody')->andReturnSelf(); + $msg->shouldReceive('getKey')->andReturn(null); + + $encoder = new JsonSchemaEncoder($registry, $bodySchema); + $result = $encoder->encode($msg); + + $this->assertSame($msg, $result); + } + + /** @test */ + public function testEncodeBodyUsesExistingSchemaId(): void + { + $registry = Mockery::mock(JsonSchemaRegistry::class); + $registry->shouldReceive('register')->never(); + + $bodySchema = new KafkaJsonSchema('topic-value', '{"type":"object"}'); + $bodySchema->setSchemaId(5); + + $msg = Mockery::mock(KafkaProducerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn('{"name":"test"}'); + $msg->shouldReceive('withBody')->andReturnSelf(); + $msg->shouldReceive('getKey')->andReturn(null); + + $encoder = new JsonSchemaEncoder($registry, $bodySchema); + $result = $encoder->encode($msg); + + $this->assertSame($msg, $result); + } + + /** @test */ + public function testEncodeWithKeySchema(): void + { + $registry = Mockery::mock(JsonSchemaRegistry::class); + $registry->shouldReceive('register')->twice()->andReturn(10); + + $bodySchema = new KafkaJsonSchema('topic-value', '{"type":"object"}'); + $keySchema = new KafkaJsonSchema('topic-key', '{"type":"string"}'); + + $msg = Mockery::mock(KafkaProducerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn(['id' => 1]); + $msg->shouldReceive('withBody')->andReturnSelf(); + $msg->shouldReceive('getKey')->andReturn('my-key'); + $msg->shouldReceive('withKey')->andReturnSelf(); + + $encoder = new JsonSchemaEncoder($registry, $bodySchema, $keySchema); + $result = $encoder->encode($msg); + + $this->assertSame($msg, $result); + } + + /** @test */ + public function testEncodeThrowsOnInvalidSchemaDefinition(): void + { + $this->expectException(ParameterException::class); + + $registry = Mockery::mock(JsonSchemaRegistry::class); + $bodySchema = new KafkaJsonSchema('topic-value', 'not valid json schema{{{'); + $bodySchema->setSchemaId(1); + + $msg = Mockery::mock(KafkaProducerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn(['name' => 'test']); + + $encoder = new JsonSchemaEncoder($registry, $bodySchema); + $encoder->encode($msg); + } + + /** @test */ + public function testEncodeThrowsOnInvalidPayload(): void + { + $this->expectException(\TypeError::class); + + $registry = Mockery::mock(JsonSchemaRegistry::class); + $bodySchema = new KafkaJsonSchema('topic-value', '{"type":"object"}'); + $bodySchema->setSchemaId(1); + + $msg = Mockery::mock(KafkaProducerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn(NAN); + + $encoder = new JsonSchemaEncoder($registry, $bodySchema); + $encoder->encode($msg); + } + + /** @test */ + public function testEncodeThrowsOnMissingRequiredField(): void + { + $this->expectException(ParameterException::class); + + $registry = Mockery::mock(JsonSchemaRegistry::class); + $schema = '{"type":"object","required":["name"],"properties":{"name":{"type":"string"}}}'; + $bodySchema = new KafkaJsonSchema('topic-value', $schema); + $bodySchema->setSchemaId(1); + + $msg = Mockery::mock(KafkaProducerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn(['age' => 30]); + + $encoder = new JsonSchemaEncoder($registry, $bodySchema); + $encoder->encode($msg); + } + + /** @test */ + public function testEncodeThrowsOnTypeMismatch(): void + { + $this->expectException(ParameterException::class); + + $registry = Mockery::mock(JsonSchemaRegistry::class); + $schema = '{"type":"object","properties":{"name":{"type":"string"}}}'; + $bodySchema = new KafkaJsonSchema('topic-value', $schema); + $bodySchema->setSchemaId(1); + + $msg = Mockery::mock(KafkaProducerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn(['name' => 123]); + + $encoder = new JsonSchemaEncoder($registry, $bodySchema); + $encoder->encode($msg); + } + + /** @test */ + public function testEncodePassesWithCorrectTypes(): void + { + $registry = Mockery::mock(JsonSchemaRegistry::class); + $schema = '{"type":"object","required":["name","age"],"properties":{"name":{"type":"string"},"age":{"type":"integer"},"active":{"type":"boolean"},"score":{"type":"number"},"tags":{"type":"array"},"meta":{"type":"object"},"nothing":{"type":"null"}}}'; + $bodySchema = new KafkaJsonSchema('topic-value', $schema); + $bodySchema->setSchemaId(1); + + $msg = Mockery::mock(KafkaProducerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn([ + 'name' => 'test', + 'age' => 25, + 'active' => true, + 'score' => 9.5, + 'tags' => ['a'], + 'meta' => (object)['k' => 'v'], + 'nothing' => null, + ]); + $msg->shouldReceive('withBody')->andReturnSelf(); + $msg->shouldReceive('getKey')->andReturn(null); + + $encoder = new JsonSchemaEncoder($registry, $bodySchema); + $result = $encoder->encode($msg); + + $this->assertSame($msg, $result); + } + + /** @test */ + public function testEncodeSkipsTypeValidationWhenNoTypeInSchema(): void + { + $registry = Mockery::mock(JsonSchemaRegistry::class); + $schema = '{"type":"object","properties":{"name":{}}}'; + $bodySchema = new KafkaJsonSchema('topic-value', $schema); + $bodySchema->setSchemaId(1); + + $msg = Mockery::mock(KafkaProducerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn(['name' => 123]); + $msg->shouldReceive('withBody')->andReturnSelf(); + $msg->shouldReceive('getKey')->andReturn(null); + + $encoder = new JsonSchemaEncoder($registry, $bodySchema); + $result = $encoder->encode($msg); + + $this->assertSame($msg, $result); + } + + /** @test */ + public function testEncodeWithMultiTypeAllowsNullable(): void + { + $registry = Mockery::mock(JsonSchemaRegistry::class); + $schema = '{"type":"object","properties":{"name":{"type":["string","null"]}}}'; + $bodySchema = new KafkaJsonSchema('topic-value', $schema); + $bodySchema->setSchemaId(1); + + $msg = Mockery::mock(KafkaProducerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn(['name' => null]); + $msg->shouldReceive('withBody')->andReturnSelf(); + $msg->shouldReceive('getKey')->andReturn(null); + + $encoder = new JsonSchemaEncoder($registry, $bodySchema); + $result = $encoder->encode($msg); + + $this->assertSame($msg, $result); + } + + /** @test */ + public function testEncodeWithKeySchemaAndNonStringKey(): void + { + $registry = Mockery::mock(JsonSchemaRegistry::class); + $registry->shouldReceive('register')->twice()->andReturn(10); + + $bodySchema = new KafkaJsonSchema('topic-value', '{"type":"object"}'); + $keySchema = new KafkaJsonSchema('topic-key', '{"type":"object"}'); + + $msg = Mockery::mock(KafkaProducerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn(['id' => 1]); + $msg->shouldReceive('withBody')->andReturnSelf(); + $msg->shouldReceive('getKey')->andReturn(['id' => 1]); + $msg->shouldReceive('withKey')->andReturnSelf(); + + $encoder = new JsonSchemaEncoder($registry, $bodySchema, $keySchema); + $result = $encoder->encode($msg); + + $this->assertSame($msg, $result); + } +} diff --git a/tests/Libraries/Kafka/JsonSchemaRegistryTest.php b/tests/Libraries/Kafka/JsonSchemaRegistryTest.php new file mode 100644 index 0000000..85591f4 --- /dev/null +++ b/tests/Libraries/Kafka/JsonSchemaRegistryTest.php @@ -0,0 +1,164 @@ + + * @license https://mit-license.org/ MIT License + * @version GIT: 0.0.1 + * @link https://github.com/ + */ + +declare(strict_types=1); + +namespace Tests\Libraries\Kafka; + +use GuzzleHttp\Client as GuzzleClient; +use Mockery; +use PHPUnit\Framework\TestCase; +use Psr\Http\Message\ResponseInterface; +use Psr\Http\Message\StreamInterface; +use Spotlibs\PhpLib\Exceptions\ParameterException; +use Spotlibs\PhpLib\Libraries\Kafka\JsonSchemaRegistry; + +/** + * JsonSchemaRegistryTest + * + * Unit test for JsonSchemaRegistry + * + * @category Test + * @package Tests\Libraries\Kafka + * @author Mufthi Ryanda + * @license https://mit-license.org/ MIT License + * @link https://github.com/ + * @covers \Spotlibs\PhpLib\Libraries\Kafka\JsonSchemaRegistry + * @covers \Spotlibs\PhpLib\Exceptions\ParameterException + */ +class JsonSchemaRegistryTest extends TestCase +{ + protected function tearDown(): void + { + Mockery::close(); + parent::tearDown(); + } + + /** @test */ + public function testRegisterSuccess(): void + { + $stream = Mockery::mock(StreamInterface::class); + $stream->shouldReceive('getContents')->once()->andReturn(json_encode(['id' => 10])); + + $response = Mockery::mock(ResponseInterface::class); + $response->shouldReceive('getBody')->once()->andReturn($stream); + + $client = Mockery::mock(GuzzleClient::class); + $client->shouldReceive('post')->once()->andReturn($response); + + $registry = new JsonSchemaRegistry($client); + $result = $registry->register('topic-value', '{"type":"object"}'); + + $this->assertSame(10, $result); + } + + /** @test */ + public function testRegisterThrowsWhenNoId(): void + { + $this->expectException(ParameterException::class); + + $stream = Mockery::mock(StreamInterface::class); + $stream->shouldReceive('getContents')->once()->andReturn(json_encode([])); + + $response = Mockery::mock(ResponseInterface::class); + $response->shouldReceive('getBody')->once()->andReturn($stream); + + $client = Mockery::mock(GuzzleClient::class); + $client->shouldReceive('post')->once()->andReturn($response); + + $registry = new JsonSchemaRegistry($client); + $registry->register('topic-value', '{"type":"object"}'); + } + + /** @test */ + public function testRegisterThrowsOnHttpError(): void + { + $this->expectException(ParameterException::class); + + $client = Mockery::mock(GuzzleClient::class); + $client->shouldReceive('post')->once()->andThrow(new \RuntimeException('connection failed')); + + $registry = new JsonSchemaRegistry($client); + $registry->register('topic-value', '{"type":"object"}'); + } + + /** @test */ + public function testGetSchemaByIdSuccess(): void + { + $stream = Mockery::mock(StreamInterface::class); + $stream->shouldReceive('getContents')->once()->andReturn(json_encode(['schema' => '{"type":"object"}'])); + + $response = Mockery::mock(ResponseInterface::class); + $response->shouldReceive('getBody')->once()->andReturn($stream); + + $client = Mockery::mock(GuzzleClient::class); + $client->shouldReceive('get')->once()->andReturn($response); + + $registry = new JsonSchemaRegistry($client); + $result = $registry->getSchemaById(5); + + $this->assertSame('{"type":"object"}', $result); + } + + /** @test */ + public function testGetSchemaByIdUsesCache(): void + { + // First call populates cache + $stream = Mockery::mock(StreamInterface::class); + $stream->shouldReceive('getContents')->once()->andReturn(json_encode(['id' => 5])); + + $response = Mockery::mock(ResponseInterface::class); + $response->shouldReceive('getBody')->once()->andReturn($stream); + + $client = Mockery::mock(GuzzleClient::class); + $client->shouldReceive('post')->once()->andReturn($response); + $client->shouldReceive('get')->never(); + + $registry = new JsonSchemaRegistry($client); + $registry->register('topic-value', '{"type":"object"}'); + + // Second call should use cache + $result = $registry->getSchemaById(5); + $this->assertSame('{"type":"object"}', $result); + } + + /** @test */ + public function testGetSchemaByIdThrowsWhenNoSchema(): void + { + $this->expectException(ParameterException::class); + + $stream = Mockery::mock(StreamInterface::class); + $stream->shouldReceive('getContents')->once()->andReturn(json_encode([])); + + $response = Mockery::mock(ResponseInterface::class); + $response->shouldReceive('getBody')->once()->andReturn($stream); + + $client = Mockery::mock(GuzzleClient::class); + $client->shouldReceive('get')->once()->andReturn($response); + + $registry = new JsonSchemaRegistry($client); + $registry->getSchemaById(99); + } + + /** @test */ + public function testGetSchemaByIdThrowsOnHttpError(): void + { + $this->expectException(ParameterException::class); + + $client = Mockery::mock(GuzzleClient::class); + $client->shouldReceive('get')->once()->andThrow(new \RuntimeException('timeout')); + + $registry = new JsonSchemaRegistry($client); + $registry->getSchemaById(99); + } +} diff --git a/tests/Libraries/Kafka/KafkaAutoDecoderTest.php b/tests/Libraries/Kafka/KafkaAutoDecoderTest.php new file mode 100644 index 0000000..ecfcddc --- /dev/null +++ b/tests/Libraries/Kafka/KafkaAutoDecoderTest.php @@ -0,0 +1,232 @@ + + * @license https://mit-license.org/ MIT License + * @version GIT: 0.0.1 + * @link https://github.com/ + */ + +declare(strict_types=1); + +namespace Tests\Libraries\Kafka; + +use FlixTech\AvroSerializer\Objects\RecordSerializer; +use Jobcloud\Kafka\Message\Decoder\AvroDecoder; +use Jobcloud\Kafka\Message\KafkaConsumerMessageInterface; +use Jobcloud\Kafka\Message\Registry\AvroSchemaRegistryInterface; +use Mockery; +use PHPUnit\Framework\TestCase; +use Spotlibs\PhpLib\Libraries\Kafka\JsonSchemaDecoder; +use Spotlibs\PhpLib\Libraries\Kafka\KafkaAutoDecoder; +use Spotlibs\PhpLib\Libraries\Kafka\KafkaDecodedMessage; + +/** + * KafkaAutoDecoderTest + * + * Unit test for KafkaAutoDecoder + * + * @category Test + * @package Tests\Libraries\Kafka + * @author Mufthi Ryanda + * @license https://mit-license.org/ MIT License + * @link https://github.com/ + * @covers \Spotlibs\PhpLib\Libraries\Kafka\KafkaAutoDecoder + * @covers \Spotlibs\PhpLib\Libraries\Kafka\JsonSchemaDecoder + * @covers \Spotlibs\PhpLib\Libraries\Kafka\KafkaDecodedMessage + */ +class KafkaAutoDecoderTest extends TestCase +{ + private AvroDecoder $avroDecoder; + + protected function setUp(): void + { + parent::setUp(); + $registry = Mockery::mock(AvroSchemaRegistryInterface::class); + $recordSerializer = Mockery::mock(RecordSerializer::class); + $this->avroDecoder = new AvroDecoder($registry, $recordSerializer); + } + + protected function tearDown(): void + { + Mockery::close(); + parent::tearDown(); + } + + /** @test */ + public function testDecodeReturnsOriginalWhenBodyNotString(): void + { + $msg = Mockery::mock(KafkaConsumerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn(null); + + $decoder = new KafkaAutoDecoder($this->avroDecoder); + $result = $decoder->decode($msg); + + $this->assertSame($msg, $result); + } + + /** @test */ + public function testDecodeReturnsOriginalWhenBodyTooShort(): void + { + $msg = Mockery::mock(KafkaConsumerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn('short'); + + $decoder = new KafkaAutoDecoder($this->avroDecoder); + $result = $decoder->decode($msg); + + $this->assertSame($msg, $result); + } + + /** @test */ + public function testDecodeWireFormatJsonSchemaWithDecoder(): void + { + $payload = json_encode(['key' => 'value']); + $body = "\x00" . pack('N', 1) . $payload; + + $msg = Mockery::mock(KafkaConsumerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn($body); + + $decodedMsg = Mockery::mock(KafkaConsumerMessageInterface::class); + + $jsonDecoder = Mockery::mock(JsonSchemaDecoder::class); + $jsonDecoder->shouldReceive('decode')->once()->andReturn($decodedMsg); + + $decoder = new KafkaAutoDecoder($this->avroDecoder, $jsonDecoder); + $result = $decoder->decode($msg); + + $this->assertSame($decodedMsg, $result); + } + + /** @test */ + public function testDecodeWireFormatJsonSchemaFallsBackOnDecoderException(): void + { + $payload = json_encode(['key' => 'value']); + $body = "\x00" . pack('N', 1) . $payload; + + $msg = Mockery::mock(KafkaConsumerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn($body); + + $jsonDecoder = Mockery::mock(JsonSchemaDecoder::class); + $jsonDecoder->shouldReceive('decode')->once()->andThrow(new \RuntimeException('fail')); + + $decoder = new KafkaAutoDecoder($this->avroDecoder, $jsonDecoder); + $result = $decoder->decode($msg); + + $this->assertInstanceOf(KafkaDecodedMessage::class, $result); + $this->assertSame(['key' => 'value'], $result->getBody()); + } + + /** @test */ + public function testDecodeWireFormatJsonSchemaWithoutDecoder(): void + { + $payload = json_encode(['key' => 'value']); + $body = "\x00" . pack('N', 1) . $payload; + + $msg = Mockery::mock(KafkaConsumerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn($body); + + $decoder = new KafkaAutoDecoder($this->avroDecoder); + $result = $decoder->decode($msg); + + $this->assertInstanceOf(KafkaDecodedMessage::class, $result); + $this->assertSame(['key' => 'value'], $result->getBody()); + } + + /** @test */ + public function testDecodeWireFormatFallsBackToAvro(): void + { + // Binary payload that is NOT valid JSON after header + $body = "\x00" . pack('N', 1) . "\x02\x06foo"; + + $msg = Mockery::mock(KafkaConsumerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn($body); + $msg->shouldReceive('getTopicName')->andReturn('test-topic'); + $msg->shouldReceive('getPartition')->andReturn(0); + $msg->shouldReceive('getOffset')->andReturn(0); + $msg->shouldReceive('getTimestamp')->andReturn(0); + $msg->shouldReceive('getKey')->andReturn(null); + $msg->shouldReceive('getHeaders')->andReturn([]); + + $registry = Mockery::mock(AvroSchemaRegistryInterface::class); + $registry->shouldReceive('hasBodySchemaForTopic')->andReturn(false); + $registry->shouldReceive('hasKeySchemaForTopic')->andReturn(false); + + $recordSerializer = Mockery::mock(RecordSerializer::class); + $avroDecoder = new AvroDecoder($registry, $recordSerializer); + + $decoder = new KafkaAutoDecoder($avroDecoder); + $result = $decoder->decode($msg); + + // AvroDecoder returns a new KafkaConsumerMessage (not the same instance) + $this->assertInstanceOf(KafkaConsumerMessageInterface::class, $result); + } + + /** @test */ + public function testDecodeWireFormatReturnsOriginalWhenAvroFails(): void + { + $body = "\x00" . pack('N', 1) . "\x02\x06foo"; + + $msg = Mockery::mock(KafkaConsumerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn($body); + $msg->shouldReceive('getTopicName')->andReturn('test-topic'); + + $registry = Mockery::mock(AvroSchemaRegistryInterface::class); + $registry->shouldReceive('hasBodySchemaForTopic')->andThrow(new \RuntimeException('avro fail')); + + $recordSerializer = Mockery::mock(RecordSerializer::class); + $avroDecoder = new AvroDecoder($registry, $recordSerializer); + + $decoder = new KafkaAutoDecoder($avroDecoder); + $result = $decoder->decode($msg); + + $this->assertSame($msg, $result); + } + + /** @test */ + public function testDecodePlainJson(): void + { + $body = json_encode(['hello' => 'world']); + + $msg = Mockery::mock(KafkaConsumerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn($body); + + $decoder = new KafkaAutoDecoder($this->avroDecoder); + $result = $decoder->decode($msg); + + $this->assertInstanceOf(KafkaDecodedMessage::class, $result); + $this->assertSame(['hello' => 'world'], $result->getBody()); + } + + /** @test */ + public function testDecodePlainJsonArray(): void + { + $body = json_encode([1, 2, 3]); + + $msg = Mockery::mock(KafkaConsumerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn($body); + + $decoder = new KafkaAutoDecoder($this->avroDecoder); + $result = $decoder->decode($msg); + + $this->assertInstanceOf(KafkaDecodedMessage::class, $result); + $this->assertSame([1, 2, 3], $result->getBody()); + } + + /** @test */ + public function testDecodeReturnsOriginalForNonJsonPlainText(): void + { + $body = 'this is just plain text that is longer than 5 chars'; + + $msg = Mockery::mock(KafkaConsumerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn($body); + + $decoder = new KafkaAutoDecoder($this->avroDecoder); + $result = $decoder->decode($msg); + + $this->assertSame($msg, $result); + } +} diff --git a/tests/Libraries/Kafka/KafkaDecodedMessageTest.php b/tests/Libraries/Kafka/KafkaDecodedMessageTest.php new file mode 100644 index 0000000..62d3248 --- /dev/null +++ b/tests/Libraries/Kafka/KafkaDecodedMessageTest.php @@ -0,0 +1,112 @@ + + * @license https://mit-license.org/ MIT License + * @version GIT: 0.0.1 + * @link https://github.com/ + */ + +declare(strict_types=1); + +namespace Tests\Libraries\Kafka; + +use Jobcloud\Kafka\Message\KafkaConsumerMessageInterface; +use Mockery; +use PHPUnit\Framework\TestCase; +use Spotlibs\PhpLib\Libraries\Kafka\KafkaDecodedMessage; + +/** + * KafkaDecodedMessageTest + * + * Unit test for KafkaDecodedMessage + * + * @category Test + * @package Tests\Libraries\Kafka + * @author Mufthi Ryanda + * @license https://mit-license.org/ MIT License + * @link https://github.com/ + * @covers \Spotlibs\PhpLib\Libraries\Kafka\KafkaDecodedMessage + */ +class KafkaDecodedMessageTest extends TestCase +{ + protected function tearDown(): void + { + Mockery::close(); + parent::tearDown(); + } + + /** @test */ + public function testGetBodyReturnsDecodedBody(): void + { + $original = Mockery::mock(KafkaConsumerMessageInterface::class); + $decoded = ['key' => 'value']; + + $msg = new KafkaDecodedMessage($original, $decoded); + $this->assertSame($decoded, $msg->getBody()); + } + + /** @test */ + public function testGetKeyDelegatesToOriginal(): void + { + $original = Mockery::mock(KafkaConsumerMessageInterface::class); + $original->shouldReceive('getKey')->once()->andReturn('my-key'); + + $msg = new KafkaDecodedMessage($original, []); + $this->assertSame('my-key', $msg->getKey()); + } + + /** @test */ + public function testGetTopicNameDelegatesToOriginal(): void + { + $original = Mockery::mock(KafkaConsumerMessageInterface::class); + $original->shouldReceive('getTopicName')->once()->andReturn('my-topic'); + + $msg = new KafkaDecodedMessage($original, []); + $this->assertSame('my-topic', $msg->getTopicName()); + } + + /** @test */ + public function testGetPartitionDelegatesToOriginal(): void + { + $original = Mockery::mock(KafkaConsumerMessageInterface::class); + $original->shouldReceive('getPartition')->once()->andReturn(3); + + $msg = new KafkaDecodedMessage($original, []); + $this->assertSame(3, $msg->getPartition()); + } + + /** @test */ + public function testGetHeadersDelegatesToOriginal(): void + { + $original = Mockery::mock(KafkaConsumerMessageInterface::class); + $original->shouldReceive('getHeaders')->once()->andReturn(['h' => 'v']); + + $msg = new KafkaDecodedMessage($original, []); + $this->assertSame(['h' => 'v'], $msg->getHeaders()); + } + + /** @test */ + public function testGetOffsetDelegatesToOriginal(): void + { + $original = Mockery::mock(KafkaConsumerMessageInterface::class); + $original->shouldReceive('getOffset')->once()->andReturn(100); + + $msg = new KafkaDecodedMessage($original, []); + $this->assertSame(100, $msg->getOffset()); + } + + /** @test */ + public function testGetTimestampDelegatesToOriginal(): void + { + $original = Mockery::mock(KafkaConsumerMessageInterface::class); + $original->shouldReceive('getTimestamp')->once()->andReturn(1234567890); + + $msg = new KafkaDecodedMessage($original, []); + $this->assertSame(1234567890, $msg->getTimestamp()); + } +} diff --git a/tests/Libraries/Kafka/KafkaJsonSchemaTest.php b/tests/Libraries/Kafka/KafkaJsonSchemaTest.php new file mode 100644 index 0000000..5c08dac --- /dev/null +++ b/tests/Libraries/Kafka/KafkaJsonSchemaTest.php @@ -0,0 +1,63 @@ + + * @license https://mit-license.org/ MIT License + * @version GIT: 0.0.1 + * @link https://github.com/ + */ + +declare(strict_types=1); + +namespace Tests\Libraries\Kafka; + +use PHPUnit\Framework\TestCase; +use Spotlibs\PhpLib\Libraries\Kafka\KafkaJsonSchema; + +/** + * KafkaJsonSchemaTest + * + * Unit test for KafkaJsonSchema + * + * @category Test + * @package Tests\Libraries\Kafka + * @author Mufthi Ryanda + * @license https://mit-license.org/ MIT License + * @link https://github.com/ + * @covers \Spotlibs\PhpLib\Libraries\Kafka\KafkaJsonSchema + */ +class KafkaJsonSchemaTest extends TestCase +{ + /** @test */ + public function testGetSubject(): void + { + $schema = new KafkaJsonSchema('topic-value', '{"type":"object"}'); + $this->assertSame('topic-value', $schema->getSubject()); + } + + /** @test */ + public function testGetDefinition(): void + { + $schema = new KafkaJsonSchema('topic-value', '{"type":"object"}'); + $this->assertSame('{"type":"object"}', $schema->getDefinition()); + } + + /** @test */ + public function testGetSchemaIdReturnsNullByDefault(): void + { + $schema = new KafkaJsonSchema('topic-value', '{"type":"object"}'); + $this->assertNull($schema->getSchemaId()); + } + + /** @test */ + public function testSetAndGetSchemaId(): void + { + $schema = new KafkaJsonSchema('topic-value', '{"type":"object"}'); + $schema->setSchemaId(42); + $this->assertSame(42, $schema->getSchemaId()); + } +}