From 00addfc8cb2bb3ac2c8e122b92e6eca0c9f7b7d7 Mon Sep 17 00:00:00 2001 From: Mufthi Ryanda <77824812+mufthiryanda@users.noreply.github.com> Date: Tue, 27 Jan 2026 12:58:16 +0700 Subject: [PATCH 01/11] feat : add make commad kafka scheme --- src/Commands/KafkaSchemaMakeCommand.php | 316 ++++++++++++++++++++++ src/Commands/stubs/kafka.schema.avro.stub | 85 ++++++ src/Commands/stubs/kafka.schema.json.stub | 85 ++++++ 3 files changed, 486 insertions(+) create mode 100644 src/Commands/KafkaSchemaMakeCommand.php create mode 100644 src/Commands/stubs/kafka.schema.avro.stub create mode 100644 src/Commands/stubs/kafka.schema.json.stub diff --git a/src/Commands/KafkaSchemaMakeCommand.php b/src/Commands/KafkaSchemaMakeCommand.php new file mode 100644 index 0000000..87e9c25 --- /dev/null +++ b/src/Commands/KafkaSchemaMakeCommand.php @@ -0,0 +1,316 @@ + + * @license https://mit-license.org/ MIT License + * @version GIT: 0.0.1 + * @link https://github.com/spotlibs + */ + +declare(strict_types=1); + +namespace Spotlibs\PhpLib\Commands; + +use Illuminate\Console\GeneratorCommand; +use Illuminate\Contracts\Filesystem\FileNotFoundException; +use JsonException; +use Symfony\Component\Console\Input\InputOption; + +/** + * KafkaSchemaMakeCommand + * + * Custom command for Kafka schema + * + * @category Console + * @package Commands + * @author Mufthi Ryanda + * @license https://mit-license.org/ MIT License + * @link https://github.com/spotlibs + */ +class KafkaSchemaMakeCommand extends GeneratorCommand +{ + /** + * The console command name. + * + * @var string + */ + protected $name = 'make:kafka-schema'; + + /** + * The console command description. + * + * @var string + */ + protected $description = 'Create a new Kafka schema model class'; + + /** + * The type of class being generated. + * + * @var string + */ + protected $type = 'Kafka Schema Model'; + + /** + * Schema input from user + * + * @var array + */ + protected array $schemaData = []; + + /** + * Schema type + * + * @var string + */ + protected string $schemaType = 'avro'; + + /** + * Execute the console command. + * + * @return int + * @throws FileNotFoundException + */ + public function handle() : int + { + $this->schemaType = $this->option('type') ?? 'avro'; + + $this->info('Please paste your ' . strtoupper($this->schemaType) . ' schema (press Ctrl+D or Ctrl+Z when done):'); + if (!in_array($this->schemaType, ['avro', 'json'])) { + $this->error('Invalid schema type. Must be either "avro" or "json".'); + return 1; + } + + $schemaInput = stream_get_contents(STDIN); + + try { + $this->schemaData = json_decode($schemaInput, true, 512, JSON_THROW_ON_ERROR); + } catch (JsonException $e) { + $this->error('Invalid JSON schema: ' . $e->getMessage()); + return 1; + } + + parent::handle(); + + $this->createCollection(); + return 0; + } + + /** + * Get the stub file for the generator. + * + * @return string + */ + protected function getStub(): string + { + if ($this->schemaType === 'json') { + return __DIR__ . '/stubs/kafka.schema.json.stub'; + } + + return __DIR__ . '/stubs/kafka.schema.avro.stub'; + } + + /** + * Build the class with the given name. + * + * @param string $name name of the class + * + * @return string + * @throws FileNotFoundException + */ + protected function buildClass($name): string + { + $stub = parent::buildClass($name); + + return $this->replaceSchema($stub); + } + + /** + * Replace schema placeholders in stub + * + * @param string $stub stub content + * + * @return string + */ + protected function replaceSchema(string $stub): string + { + $className = class_basename($this->argument('name')); + + if ($this->schemaType === 'avro') { + $fields = $this->schemaData['fields'] ?? []; + $fillable = $this->generateFillableFromAvro($fields); + $casts = $this->generateCastsFromAvro($fields); + $schemaName = $this->schemaData['name'] ?? 'value_Schema' . str_replace('CDC', '', $className); + $namespace = $this->schemaData['namespace'] ?? strtolower(preg_replace('/(?schemaData['properties'] ?? []; + $fillable = $this->generateFillableFromJson($properties); + $casts = $this->generateCastsFromJson($properties); + $schemaName = 'Schema' . str_replace('CDC', '', $className); + $namespace = strtolower(preg_replace('/(?formatSchemaBody(), $stub); + $stub = str_replace('DummySchemaName', $schemaName, $stub); + $stub = str_replace('DummySchemaNamespace', $namespace, $stub); + + return $stub; + } + + /** + * Generate fillable array from AVRO fields + * + * @param array $fields AVRO fields + * + * @return string + */ + protected function generateFillableFromAvro(array $fields): string + { + $fillable = array_map(fn($field) => " '{$field['name']}'", $fields); + return implode(",\n", $fillable); + } + + /** + * Generate casts array from AVRO fields + * + * @param array $fields AVRO fields + * + * @return string + */ + protected function generateCastsFromAvro(array $fields): string + { + $casts = []; + foreach ($fields as $field) { + $type = is_array($field['type']) ? $field['type'][1] ?? 'string' : $field['type']; + $phpType = $this->mapAvroTypeToPhp($type); + $casts[] = " '{$field['name']}' => '{$phpType}'"; + } + return implode(",\n", $casts); + } + + /** + * Generate fillable array from JSON properties + * + * @param array $properties JSON properties + * + * @return string + */ + protected function generateFillableFromJson(array $properties): string + { + $fillable = array_map(fn($name) => " '{$name}'", array_keys($properties)); + return implode(",\n", $fillable); + } + + /** + * Generate casts array from JSON properties + * + * @param array $properties JSON properties + * + * @return string + */ + protected function generateCastsFromJson(array $properties): string + { + $casts = []; + foreach ($properties as $name => $definition) { + $type = $definition['type'] ?? 'string'; + $phpType = $this->mapJsonTypeToPhp($type); + $casts[] = " '{$name}' => '{$phpType}'"; + } + return implode(",\n", $casts); + } + + /** + * Map AVRO type to PHP cast type + * + * @param string $avroType AVRO type + * + * @return string + */ + protected function mapAvroTypeToPhp(string $avroType): string + { + return match ($avroType) { + 'int', 'long' => 'integer', + 'float', 'double' => 'float', + 'boolean' => 'boolean', + 'string' => 'string', + default => 'string', + }; + } + + /** + * Map JSON type to PHP cast type + * + * @param string $jsonType JSON type + * + * @return string + */ + protected function mapJsonTypeToPhp(string $jsonType): string + { + return match ($jsonType) { + 'integer' => 'integer', + 'number' => 'float', + 'boolean' => 'boolean', + 'string' => 'string', + default => 'string', + }; + } + + /** + * Format schema body for output + * + * @return string + */ + protected function formatSchemaBody(): string + { + $schema = json_encode($this->schemaData, JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES); + $lines = explode("\n", $schema); + $indented = array_map(fn($line) => ' ' . $line, $lines); + return implode("\n", $indented); + } + + /** + * Create a collection file for the model. + * + * @return void + */ + protected function createCollection(): void + { + $className = class_basename($this->argument('name')); + $this->call( + 'make:collection', + [ + 'name' => $className, + '--resource' => true + ] + ); + } + + /** + * Get the default namespace for the class. + * + * @param string $rootNamespace root namespace (generally App) + * + * @return string + */ + protected function getDefaultNamespace($rootNamespace): string + { + return $rootNamespace . '\Models'; + } + + /** + * Get the console command options. + * + * @return array + */ + protected function getOptions(): array + { + return [ + ['type', null, InputOption::VALUE_OPTIONAL, 'Schema type (avro or json)', 'avro'], + ]; + } +} \ No newline at end of file diff --git a/src/Commands/stubs/kafka.schema.avro.stub b/src/Commands/stubs/kafka.schema.avro.stub new file mode 100644 index 0000000..e4f611d --- /dev/null +++ b/src/Commands/stubs/kafka.schema.avro.stub @@ -0,0 +1,85 @@ + Date: Tue, 27 Jan 2026 15:44:25 +0700 Subject: [PATCH 02/11] fix : change into single line --- src/Commands/KafkaSchemaMakeCommand.php | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/src/Commands/KafkaSchemaMakeCommand.php b/src/Commands/KafkaSchemaMakeCommand.php index 87e9c25..a3ad5ef 100644 --- a/src/Commands/KafkaSchemaMakeCommand.php +++ b/src/Commands/KafkaSchemaMakeCommand.php @@ -74,17 +74,16 @@ class KafkaSchemaMakeCommand extends GeneratorCommand * @return int * @throws FileNotFoundException */ - public function handle() : int + public function handle(): int { $this->schemaType = $this->option('type') ?? 'avro'; - $this->info('Please paste your ' . strtoupper($this->schemaType) . ' schema (press Ctrl+D or Ctrl+Z when done):'); if (!in_array($this->schemaType, ['avro', 'json'])) { $this->error('Invalid schema type. Must be either "avro" or "json".'); return 1; } - $schemaInput = stream_get_contents(STDIN); + $schemaInput = $this->ask('Paste your ' . strtoupper($this->schemaType) . ' schema (single line)'); try { $this->schemaData = json_decode($schemaInput, true, 512, JSON_THROW_ON_ERROR); @@ -94,7 +93,6 @@ public function handle() : int } parent::handle(); - $this->createCollection(); return 0; } @@ -281,13 +279,7 @@ protected function formatSchemaBody(): string protected function createCollection(): void { $className = class_basename($this->argument('name')); - $this->call( - 'make:collection', - [ - 'name' => $className, - '--resource' => true - ] - ); + $this->call('make:collection', ['name' => $className]); } /** From b11baf3cdc3812c029213be9f4022df8a6155540 Mon Sep 17 00:00:00 2001 From: Mufthi Ryanda <77824812+mufthiryanda@users.noreply.github.com> Date: Thu, 29 Jan 2026 15:48:08 +0700 Subject: [PATCH 03/11] feat : add publish kafka library --- src/Libraries/Kafka.php | 432 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 432 insertions(+) create mode 100644 src/Libraries/Kafka.php diff --git a/src/Libraries/Kafka.php b/src/Libraries/Kafka.php new file mode 100644 index 0000000..a9febf2 --- /dev/null +++ b/src/Libraries/Kafka.php @@ -0,0 +1,432 @@ + + * @license https://mit-license.org/ MIT License + * @version GIT: 0.4.0 + * @link https://github.com/spotlibs + */ + +declare(strict_types=1); + +namespace Spotlibs\PhpLib\Libraries; + +use AvroIOException; +use AvroSchema; +use AvroSchemaParseException; +use GuzzleHttp\Client as GuzzleClient; +use Jobcloud\Kafka\Exception\KafkaProducerException; +use Jobcloud\Kafka\Producer\KafkaProducer; +use Jobcloud\Kafka\Producer\KafkaProducerBuilder; +use Jobcloud\Kafka\Message\KafkaProducerMessage; +use Jobcloud\Kafka\Message\Encoder\AvroEncoder; +use Jobcloud\Kafka\Message\Encoder\JsonEncoder; +use Jobcloud\Kafka\Message\KafkaAvroSchema; +use Jobcloud\Kafka\Message\KafkaAvroSchemaInterface; +use Jobcloud\Kafka\Message\Registry\AvroSchemaRegistry; +use FlixTech\AvroSerializer\Objects\RecordSerializer; +use FlixTech\SchemaRegistryApi\Registry\CachedRegistry; +use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry; +use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry; +use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter; +use Spotlibs\PhpLib\Exceptions\ParameterException; +use Spotlibs\PhpLib\Logs\Log; + +/** + * Kafka + * + * Kafka producer client for publishing messages with schema support + * + * @category Library + * @package Libraries + * @author Mufthi Ryanda + * @license https://mit-license.org/ MIT License + * @link https://github.com/spotlibs + */ +class Kafka +{ + /** + * Schema type: No schema, no encoding (raw data) + */ + public const SCHEMALESS = 1; + + /** + * Schema type: JSON schema with validation and encoding + */ + public const JSON_SCHEMA = 2; + + /** + * Schema type: Avro schema with registry and encoding + */ + public const AVRO_SCHEMA = 3; + + /** + * Current producer instance + * + * @var KafkaProducer|null + */ + private static ?KafkaProducer $producer = null; + + /** + * Current topic name + * + * @var string|null + */ + private static ?string $currentTopic = null; + + /** + * Setup and return Kafka producer + * + * @param string $topic Topic name + * @param int $schemaType Schema type constant + * @param string|null $schemaBody Schema definition for message body + * @param string|null $schemaKey Schema definition for message key + * @param array $additionalConfig Additional Kafka configuration + * + * @return KafkaProducer + * @throws AvroIOException + * @throws AvroSchemaParseException + * @throws KafkaProducerException + * @throws ParameterException + */ + public static function publishOn( + string $topic, + int $schemaType, + ?string $schemaBody = null, + ?string $schemaKey = null, + array $additionalConfig = [] + ): KafkaProducer { + self::validateEnvironment(); + self::$currentTopic = $topic; + + $producerBuilder = self::createProducerBuilder($additionalConfig); + + if ($schemaType === self::AVRO_SCHEMA) { + $encoder = self::createAvroEncoder($topic, $schemaBody, $schemaKey); + $producerBuilder->withEncoder($encoder); + } elseif ($schemaType === self::JSON_SCHEMA) { + $encoder = new JsonEncoder(); + $producerBuilder->withEncoder($encoder); + } + + self::$producer = $producerBuilder->build(); + + Log::runtime()->info( + [ + 'operation' => 'kafka_producer_initialized', + 'topic' => $topic, + 'schemaType' => $schemaType + ] + ); + + return self::$producer; + } + + /** + * Produce a message to the current topic + * + * @param mixed $body Message body + * @param string|null $key Message key for partitioning + * @param int $partition Partition number (default 0) + * + * @return void + * @throws ParameterException + */ + public static function produce(mixed $body, ?string $key = null, int $partition = 0): void + { + self::ensureProducerInitialized(); + + $message = KafkaProducerMessage::create(self::$currentTopic, $partition) + ->withBody($body); + + if ($key !== null) { + $message = $message->withKey($key); + } + + self::$producer->produce($message); + } + + /** + * Produce multiple messages in batch + * + * @param array $messages Array of messages with format: ['body' => mixed, 'key' => ?string, 'partition' => int] + * + * @return void + * @throws ParameterException + */ + public static function produceBatch(array $messages): void + { + self::ensureProducerInitialized(); + + foreach ($messages as $msg) { + $body = $msg['body'] ?? null; + $key = $msg['key'] ?? null; + $partition = $msg['partition'] ?? 0; + + if ($body === null) { + continue; + } + + self::produce($body, $key, $partition); + } + } + + /** + * Flush all queued messages to Kafka + * + * @param int $timeoutMs Timeout in milliseconds (default 10000) + * + * @return void + * @throws ParameterException + */ + public static function flush(int $timeoutMs = 10000): void + { + self::ensureProducerInitialized(); + + $startTime = microtime(true); + self::$producer->flush($timeoutMs); + $elapsed = microtime(true) - $startTime; + + Log::runtime()->info( + [ + 'operation' => 'kafka_flush', + 'topic' => self::$currentTopic, + 'responseTime' => round($elapsed * 1000) + ] + ); + } + + /** + * Convenience method: publish single message immediately + * + * @param string $topic Topic name + * @param mixed $body Message body + * @param int $schemaType Schema type constant (default SCHEMALESS) + * @param string|null $schemaBody Schema definition for body + * @param string|null $schemaKey Schema definition for key + * @param string|null $key Message key + * @param int $partition Partition number (default 0) + * @param int $flushTimeoutMs Flush timeout in milliseconds (default 10000) + * + * @return void + * @throws AvroIOException + * @throws AvroSchemaParseException + * @throws KafkaProducerException + * @throws ParameterException + */ + public static function publish( + string $topic, + mixed $body, + int $schemaType = self::SCHEMALESS, + ?string $schemaBody = null, + ?string $schemaKey = null, + ?string $key = null, + int $partition = 0, + int $flushTimeoutMs = 10000 + ): void { + self::publishOn($topic, $schemaType, $schemaBody, $schemaKey); + self::produce($body, $key, $partition); + self::flush($flushTimeoutMs); + } + + /** + * Validate required environment variables + * + * @return void + * @throws ParameterException + */ + private static function validateEnvironment(): void + { + $required = [ + 'KAFKA_BROKERS_URL' => 'Kafka brokers URL', + 'KAFKA_USER_PRODUCE' => 'Kafka producer username', + 'KAFKA_PASS_PRODUCE' => 'Kafka producer password' + ]; + + foreach ($required as $env => $description) { + if (empty(env($env))) { + throw new ParameterException("Environment variable {$env} ({$description}) is not set"); + } + } + } + + /** + * Create producer builder with default configuration + * + * @param array $additionalConfig Additional configuration + * + * @return KafkaProducerBuilder + */ + private static function createProducerBuilder(array $additionalConfig): KafkaProducerBuilder + { + $defaultConfig = [ + 'compression.codec' => 'lz4', + 'sasl.username' => env('KAFKA_USER_PRODUCE'), + 'sasl.password' => env('KAFKA_PASS_PRODUCE'), + 'sasl.mechanism' => 'PLAIN', + 'security.protocol' => 'SASL_SSL', + 'message.timeout.ms' => '10000', + 'socket.timeout.ms' => '10000' + ]; + + $config = array_merge($defaultConfig, $additionalConfig); + + return KafkaProducerBuilder::create() + ->withAdditionalConfig($config) + ->withAdditionalBroker(env('KAFKA_BROKERS_URL')) + ->withDeliveryReportCallback([self::class, 'deliveryReportCallback']) + ->withErrorCallback([self::class, 'errorCallback']) + ->withLogCallback([self::class, 'logCallback']); + } + + /** + * Create Avro encoder with schema registry + * + * @param string $topic Topic name + * @param string|null $schemaBody Body schema definition + * @param string|null $schemaKey Key schema definition + * + * @return AvroEncoder + * @throws ParameterException + * @throws AvroIOException|AvroSchemaParseException + */ + private static function createAvroEncoder( + string $topic, + ?string $schemaBody, + ?string $schemaKey + ): AvroEncoder { + if (empty(env('KAFKA_SCHEMA_REGISTRY_URL'))) { + throw new ParameterException('Environment variable KAFKA_SCHEMA_REGISTRY_URL is not set'); + } + + $cachedRegistry = new CachedRegistry( + new BlockingRegistry( + new PromisingRegistry( + new GuzzleClient( + [ + 'base_uri' => env('KAFKA_SCHEMA_REGISTRY_URL'), + 'auth' => [env('KAFKA_USER_PRODUCE'), env('KAFKA_PASS_PRODUCE')] + ] + ) + ) + ), + new AvroObjectCacheAdapter() + ); + + $registry = new AvroSchemaRegistry($cachedRegistry); + $recordSerializer = new RecordSerializer( + $cachedRegistry, + [ + RecordSerializer::OPTION_REGISTER_MISSING_SUBJECTS => true, + RecordSerializer::OPTION_REGISTER_MISSING_SCHEMAS => true, + ] + ); + + if ($schemaBody !== null) { + $registry->addBodySchemaMappingForTopic( + $topic, + new KafkaAvroSchema( + $topic . '-value', + KafkaAvroSchemaInterface::LATEST_VERSION, + AvroSchema::parse($schemaBody) + ) + ); + } + + if ($schemaKey !== null) { + $registry->addKeySchemaMappingForTopic( + $topic, + new KafkaAvroSchema( + $topic . '-key', + KafkaAvroSchemaInterface::LATEST_VERSION, + AvroSchema::parse($schemaKey) + ) + ); + } + + return new AvroEncoder($registry, $recordSerializer); + } + + /** + * Ensure producer is initialized + * + * @return void + * @throws ParameterException + */ + private static function ensureProducerInitialized(): void + { + if (self::$producer === null) { + throw new ParameterException('Producer not initialized. Call publishOn() first.'); + } + } + + /** + * Delivery report callback + * + * @param mixed $kafka Kafka instance + * @param mixed $message Message + * + * @return void + */ + public static function deliveryReportCallback(mixed $kafka, mixed $message): void + { + if ($message->err !== 0) { + Log::runtime()->error( + [ + 'operation' => 'kafka_delivery_failed', + 'error' => $message->errstr(), + 'topic' => $message->topic_name, + 'partition' => $message->partition + ] + ); + } + } + + /** + * Error callback + * + * @param mixed $kafka Kafka instance + * @param int $err Error code + * @param string $reason Error reason + * + * @return void + */ + public static function errorCallback(mixed $kafka, int $err, string $reason): void + { + Log::runtime()->error( + [ + 'operation' => 'kafka_producer_error', + 'errorCode' => $err, + 'reason' => $reason + ] + ); + } + + /** + * Log callback + * + * @param mixed $kafka Kafka instance + * @param int $level Log level + * @param string $facility Facility + * @param string $message Log message + * + * @return void + */ + public static function logCallback(mixed $kafka, int $level, string $facility, string $message): void + { + if ($level <= 3) { + Log::runtime()->warning( + [ + 'operation' => 'kafka_producer_log', + 'level' => $level, + 'facility' => $facility, + 'message' => $message + ] + ); + } + } +} \ No newline at end of file From f098d23d28ccd9716d4339bc2cb5f69f3addf788 Mon Sep 17 00:00:00 2001 From: Mufthi Ryanda <77824812+mufthiryanda@users.noreply.github.com> Date: Fri, 30 Jan 2026 19:08:49 +0700 Subject: [PATCH 04/11] feat : add kafka subscriber --- src/Libraries/Kafka.php | 361 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 326 insertions(+), 35 deletions(-) diff --git a/src/Libraries/Kafka.php b/src/Libraries/Kafka.php index a9febf2..26cbe1c 100644 --- a/src/Libraries/Kafka.php +++ b/src/Libraries/Kafka.php @@ -15,24 +15,27 @@ namespace Spotlibs\PhpLib\Libraries; -use AvroIOException; use AvroSchema; -use AvroSchemaParseException; +use FlixTech\AvroSerializer\Objects\RecordSerializer; +use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry; +use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter; +use FlixTech\SchemaRegistryApi\Registry\CachedRegistry; +use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry; use GuzzleHttp\Client as GuzzleClient; -use Jobcloud\Kafka\Exception\KafkaProducerException; -use Jobcloud\Kafka\Producer\KafkaProducer; -use Jobcloud\Kafka\Producer\KafkaProducerBuilder; -use Jobcloud\Kafka\Message\KafkaProducerMessage; +use Jobcloud\Kafka\Consumer\KafkaConsumerBuilder; +use Jobcloud\Kafka\Consumer\KafkaConsumerBuilderInterface; +use Jobcloud\Kafka\Consumer\KafkaConsumerInterface; +use Jobcloud\Kafka\Message\Decoder\AvroDecoder; +use Jobcloud\Kafka\Message\Decoder\JsonDecoder; use Jobcloud\Kafka\Message\Encoder\AvroEncoder; use Jobcloud\Kafka\Message\Encoder\JsonEncoder; use Jobcloud\Kafka\Message\KafkaAvroSchema; use Jobcloud\Kafka\Message\KafkaAvroSchemaInterface; +use Jobcloud\Kafka\Message\KafkaConsumerMessageInterface; +use Jobcloud\Kafka\Message\KafkaProducerMessage; use Jobcloud\Kafka\Message\Registry\AvroSchemaRegistry; -use FlixTech\AvroSerializer\Objects\RecordSerializer; -use FlixTech\SchemaRegistryApi\Registry\CachedRegistry; -use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry; -use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry; -use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter; +use Jobcloud\Kafka\Producer\KafkaProducer; +use Jobcloud\Kafka\Producer\KafkaProducerBuilder; use Spotlibs\PhpLib\Exceptions\ParameterException; use Spotlibs\PhpLib\Logs\Log; @@ -71,6 +74,13 @@ class Kafka */ private static ?KafkaProducer $producer = null; + /** + * Current consumer instance + * + * @var KafkaConsumerInterface|null + */ + private static ?KafkaConsumerInterface $consumer = null; + /** * Current topic name * @@ -81,17 +91,13 @@ class Kafka /** * Setup and return Kafka producer * - * @param string $topic Topic name - * @param int $schemaType Schema type constant - * @param string|null $schemaBody Schema definition for message body - * @param string|null $schemaKey Schema definition for message key - * @param array $additionalConfig Additional Kafka configuration + * @param string $topic Topic name + * @param int $schemaType Schema type constant + * @param string|null $schemaBody Schema definition for message body + * @param string|null $schemaKey Schema definition for message key + * @param array $additionalConfig Additional Kafka configuration * * @return KafkaProducer - * @throws AvroIOException - * @throws AvroSchemaParseException - * @throws KafkaProducerException - * @throws ParameterException */ public static function publishOn( string $topic, @@ -150,6 +156,36 @@ public static function produce(mixed $body, ?string $key = null, int $partition self::$producer->produce($message); } + /** + * Produce a message with custom headers + * + * @param mixed $body Message body + * @param array $headers Message headers (key-value pairs) + * @param string|null $key Message key for partitioning + * @param int $partition Partition number (default 0) + * + * @return void + * @throws ParameterException + */ + public static function produceWithHeaders( + mixed $body, + array $headers, + ?string $key = null, + int $partition = 0 + ): void { + self::ensureProducerInitialized(); + + $message = KafkaProducerMessage::create(self::$currentTopic, $partition) + ->withBody($body) + ->withHeaders($headers); + + if ($key !== null) { + $message = $message->withKey($key); + } + + self::$producer->produce($message); + } + /** * Produce multiple messages in batch * @@ -200,22 +236,41 @@ public static function flush(int $timeoutMs = 10000): void ); } + /** + * Close producer and cleanup resources + * + * @param int $timeoutMs Timeout in milliseconds to flush remaining messages (default 10000) + * + * @return void + */ + public static function close(int $timeoutMs = 10000): void + { + if (self::$producer !== null) { + self::flush($timeoutMs); + self::$producer = null; + self::$currentTopic = null; + + Log::runtime()->info( + [ + 'operation' => 'kafka_producer_closed' + ] + ); + } + } + /** * Convenience method: publish single message immediately * - * @param string $topic Topic name - * @param mixed $body Message body - * @param int $schemaType Schema type constant (default SCHEMALESS) - * @param string|null $schemaBody Schema definition for body - * @param string|null $schemaKey Schema definition for key - * @param string|null $key Message key - * @param int $partition Partition number (default 0) - * @param int $flushTimeoutMs Flush timeout in milliseconds (default 10000) + * @param string $topic Topic name + * @param mixed $body Message body + * @param int $schemaType Schema type constant (default SCHEMALESS) + * @param string|null $schemaBody Schema definition for body + * @param string|null $schemaKey Schema definition for key + * @param string|null $key Message key + * @param int $partition Partition number (default 0) + * @param int $flushTimeoutMs Flush timeout in milliseconds (default 10000) * * @return void - * @throws AvroIOException - * @throws AvroSchemaParseException - * @throws KafkaProducerException * @throws ParameterException */ public static function publish( @@ -286,13 +341,12 @@ private static function createProducerBuilder(array $additionalConfig): KafkaPro /** * Create Avro encoder with schema registry * - * @param string $topic Topic name + * @param string $topic Topic name * @param string|null $schemaBody Body schema definition - * @param string|null $schemaKey Key schema definition + * @param string|null $schemaKey Key schema definition * * @return AvroEncoder * @throws ParameterException - * @throws AvroIOException|AvroSchemaParseException */ private static function createAvroEncoder( string $topic, @@ -429,4 +483,241 @@ public static function logCallback(mixed $kafka, int $level, string $facility, s ); } } -} \ No newline at end of file + + /** + * Setup and return Kafka consumer + * + * @param string $topic Topic name + * @param int $schemaType Schema type constant + * @param string|null $consumerGroup Consumer group name + * @param array $additionalConfig Additional Kafka configuration + * + * @return KafkaConsumerInterface + * @throws ParameterException + * @throws \AvroIOException + */ + public static function consumeOn( + string $topic, + int $schemaType, + ?string $consumerGroup = null, + array $additionalConfig = [] + ): KafkaConsumerInterface { + self::validateConsumerEnvironment(); + self::$currentTopic = $topic; + + $consumerBuilder = self::createConsumerBuilder($topic, $consumerGroup, $additionalConfig); + + if ($schemaType === self::AVRO_SCHEMA) { + $decoder = self::createAvroDecoder($topic); + $consumerBuilder->withDecoder($decoder); + } elseif ($schemaType === self::JSON_SCHEMA) { + $decoder = new JsonDecoder(); + $consumerBuilder->withDecoder($decoder); + } + + self::$consumer = $consumerBuilder->build(); + self::$consumer->subscribe(); + + Log::runtime()->info( + [ + 'operation' => 'kafka_consumer_initialized', + 'topic' => $topic, + 'consumerGroup' => $consumerGroup, + 'schemaType' => $schemaType + ] + ); + + return self::$consumer; + } + + /** + * Consume single message + * + * @param int $timeoutMs Timeout in milliseconds + * + * @return KafkaConsumerMessageInterface + * @throws ParameterException + */ + public static function consume(int $timeoutMs = 10000): KafkaConsumerMessageInterface + { + if (self::$consumer === null) { + throw new ParameterException('Consumer not initialized. Call consumeOn() first.'); + } + + return self::$consumer->consume($timeoutMs); + } + + /** + * Commit message offset + * + * @param mixed $message Message to commit + * + * @return void + * @throws ParameterException + */ + public static function commit(mixed $message): void + { + if (self::$consumer === null) { + throw new ParameterException('Consumer not initialized. Call consumeOn() first.'); + } + + self::$consumer->commit($message); + } + + /** + * Validate required consumer environment variables + * + * @return void + * @throws ParameterException + */ + private static function validateConsumerEnvironment(): void + { + $required = [ + 'KAFKA_BROKERS_URL' => 'Kafka brokers URL', + 'KAFKA_USER_CONSUME' => 'Kafka consumer username', + 'KAFKA_PASS_CONSUME' => 'Kafka consumer password' + ]; + + foreach ($required as $env => $description) { + if (empty(env($env))) { + throw new ParameterException("Environment variable {$env} ({$description}) is not set"); + } + } + } + + /** + * Subscribe to topics + * + * @return void + * @throws ParameterException + */ + public static function subscribe(): void + { + if (self::$consumer === null) { + throw new ParameterException('Consumer not initialized. Call consumeOn() first.'); + } + + self::$consumer->subscribe(); + + Log::runtime()->info( + [ + 'operation' => 'kafka_consumer_subscribed', + 'topic' => self::$currentTopic + ] + ); + } + + /** + * Close consumer and cleanup resources + * + * @return void + */ + public static function closeConsumer(): void + { + if (self::$consumer !== null) { + self::$consumer->unsubscribe(); + self::$consumer = null; + self::$currentTopic = null; + + Log::runtime()->info( + [ + 'operation' => 'kafka_consumer_closed' + ] + ); + } + } + + /** + * Create consumer builder with default configuration + * + * @param string $topic Topic name + * @param string|null $consumerGroup Consumer group + * @param array $additionalConfig Additional configuration + * + * @return KafkaConsumerBuilderInterface + */ + private static function createConsumerBuilder( + string $topic, + ?string $consumerGroup, + array $additionalConfig + ): KafkaConsumerBuilderInterface { + $groupName = $consumerGroup ?? $topic . '_consumer_group'; + + $defaultConfig = [ + 'client.id' => env('APP_NAME') . '-' . gethostname(), + 'compression.codec' => 'lz4', + 'sasl.username' => env('KAFKA_USER_CONSUME'), + 'sasl.password' => env('KAFKA_PASS_CONSUME'), + 'sasl.mechanism' => 'PLAIN', + 'security.protocol' => 'SASL_SSL', + 'enable.auto.commit' => false, + 'message.timeout.ms' => '10000', + 'socket.timeout.ms' => '10000', + 'request.timeout.ms' => '60000' + ]; + + $config = array_merge($defaultConfig, $additionalConfig); + + return KafkaConsumerBuilder::create() + ->withAdditionalConfig($config) + ->withAdditionalBroker((string) env('KAFKA_BROKERS_URL')) + ->withConsumerGroup($groupName) + ->withAdditionalSubscription($topic, [], KafkaConsumerBuilderInterface::OFFSET_STORED) + ->withErrorCallback([self::class, 'errorCallback']) + ->withRebalanceCallback([self::class, 'rebalanceCallback']) + ->withConsumeCallback([self::class, 'consumeCallback']) + ->withLogCallback([self::class, 'logCallback']) + ->withOffsetCommitCallback([self::class, 'offsetCommitCallback']); + } + + /** + * Create Avro decoder with schema registry + * + * @param string $topic Topic name + * + * @return AvroDecoder + * @throws ParameterException + * @throws \AvroIOException + */ + private static function createAvroDecoder(string $topic): AvroDecoder + { + if (empty(env('KAFKA_SCHEMA_REGISTRY_URL'))) { + throw new ParameterException('Environment variable KAFKA_SCHEMA_REGISTRY_URL is not set'); + } + + $cachedRegistry = new CachedRegistry( + new BlockingRegistry( + new PromisingRegistry( + new GuzzleClient( + [ + 'base_uri' => env('KAFKA_SCHEMA_REGISTRY_URL'), + 'auth' => [env('KAFKA_USER_CONSUME'), env('KAFKA_PASS_CONSUME')] + ] + ) + ) + ), + new AvroObjectCacheAdapter() + ); + + $registry = new AvroSchemaRegistry($cachedRegistry); + $recordSerializer = new RecordSerializer($cachedRegistry); + + // Fetch schemas from registry automatically + $registry->addBodySchemaMappingForTopic( + $topic, + new KafkaAvroSchema( + $topic . '-value', + KafkaAvroSchemaInterface::LATEST_VERSION + ) + ); + $registry->addKeySchemaMappingForTopic( + $topic, + new KafkaAvroSchema( + $topic . '-key', + KafkaAvroSchemaInterface::LATEST_VERSION + ) + ); + + return new AvroDecoder($registry, $recordSerializer); + } +} From d8f1af6918a19446f0522f087cb203dde4c35062 Mon Sep 17 00:00:00 2001 From: Mufthi Ryanda <77824812+mufthiryanda@users.noreply.github.com> Date: Thu, 26 Feb 2026 09:52:59 +0700 Subject: [PATCH 05/11] feat : change into non static method --- src/Libraries/Kafka.php | 155 ++++++++++++++++++++-------------------- 1 file changed, 78 insertions(+), 77 deletions(-) diff --git a/src/Libraries/Kafka.php b/src/Libraries/Kafka.php index 26cbe1c..e1b1a8d 100644 --- a/src/Libraries/Kafka.php +++ b/src/Libraries/Kafka.php @@ -42,7 +42,8 @@ /** * Kafka * - * Kafka producer client for publishing messages with schema support + * Kafka producer/consumer client for publishing and consuming messages with + * schema support. Use the Kafka() helper to retrieve an instance. * * @category Library * @package Libraries @@ -72,21 +73,21 @@ class Kafka * * @var KafkaProducer|null */ - private static ?KafkaProducer $producer = null; + private ?KafkaProducer $producer = null; /** * Current consumer instance * * @var KafkaConsumerInterface|null */ - private static ?KafkaConsumerInterface $consumer = null; + private ?KafkaConsumerInterface $consumer = null; /** * Current topic name * * @var string|null */ - private static ?string $currentTopic = null; + private ?string $currentTopic = null; /** * Setup and return Kafka producer @@ -99,27 +100,27 @@ class Kafka * * @return KafkaProducer */ - public static function publishOn( + public function publishOn( string $topic, int $schemaType, ?string $schemaBody = null, ?string $schemaKey = null, array $additionalConfig = [] ): KafkaProducer { - self::validateEnvironment(); - self::$currentTopic = $topic; + $this->validateEnvironment(); + $this->currentTopic = $topic; - $producerBuilder = self::createProducerBuilder($additionalConfig); + $producerBuilder = $this->createProducerBuilder($additionalConfig); if ($schemaType === self::AVRO_SCHEMA) { - $encoder = self::createAvroEncoder($topic, $schemaBody, $schemaKey); + $encoder = $this->createAvroEncoder($topic, $schemaBody, $schemaKey); $producerBuilder->withEncoder($encoder); } elseif ($schemaType === self::JSON_SCHEMA) { $encoder = new JsonEncoder(); $producerBuilder->withEncoder($encoder); } - self::$producer = $producerBuilder->build(); + $this->producer = $producerBuilder->build(); Log::runtime()->info( [ @@ -129,7 +130,7 @@ public static function publishOn( ] ); - return self::$producer; + return $this->producer; } /** @@ -142,18 +143,18 @@ public static function publishOn( * @return void * @throws ParameterException */ - public static function produce(mixed $body, ?string $key = null, int $partition = 0): void + public function produce(mixed $body, ?string $key = null, int $partition = 0): void { - self::ensureProducerInitialized(); + $this->ensureProducerInitialized(); - $message = KafkaProducerMessage::create(self::$currentTopic, $partition) + $message = KafkaProducerMessage::create($this->currentTopic, $partition) ->withBody($body); if ($key !== null) { $message = $message->withKey($key); } - self::$producer->produce($message); + $this->producer->produce($message); } /** @@ -167,15 +168,15 @@ public static function produce(mixed $body, ?string $key = null, int $partition * @return void * @throws ParameterException */ - public static function produceWithHeaders( + public function produceWithHeaders( mixed $body, array $headers, ?string $key = null, int $partition = 0 ): void { - self::ensureProducerInitialized(); + $this->ensureProducerInitialized(); - $message = KafkaProducerMessage::create(self::$currentTopic, $partition) + $message = KafkaProducerMessage::create($this->currentTopic, $partition) ->withBody($body) ->withHeaders($headers); @@ -183,7 +184,7 @@ public static function produceWithHeaders( $message = $message->withKey($key); } - self::$producer->produce($message); + $this->producer->produce($message); } /** @@ -194,9 +195,9 @@ public static function produceWithHeaders( * @return void * @throws ParameterException */ - public static function produceBatch(array $messages): void + public function produceBatch(array $messages): void { - self::ensureProducerInitialized(); + $this->ensureProducerInitialized(); foreach ($messages as $msg) { $body = $msg['body'] ?? null; @@ -207,7 +208,7 @@ public static function produceBatch(array $messages): void continue; } - self::produce($body, $key, $partition); + $this->produce($body, $key, $partition); } } @@ -219,18 +220,18 @@ public static function produceBatch(array $messages): void * @return void * @throws ParameterException */ - public static function flush(int $timeoutMs = 10000): void + public function flush(int $timeoutMs = 10000): void { - self::ensureProducerInitialized(); + $this->ensureProducerInitialized(); $startTime = microtime(true); - self::$producer->flush($timeoutMs); + $this->producer->flush($timeoutMs); $elapsed = microtime(true) - $startTime; Log::runtime()->info( [ 'operation' => 'kafka_flush', - 'topic' => self::$currentTopic, + 'topic' => $this->currentTopic, 'responseTime' => round($elapsed * 1000) ] ); @@ -243,12 +244,12 @@ public static function flush(int $timeoutMs = 10000): void * * @return void */ - public static function close(int $timeoutMs = 10000): void + public function close(int $timeoutMs = 10000): void { - if (self::$producer !== null) { - self::flush($timeoutMs); - self::$producer = null; - self::$currentTopic = null; + if ($this->producer !== null) { + $this->flush($timeoutMs); + $this->producer = null; + $this->currentTopic = null; Log::runtime()->info( [ @@ -273,7 +274,7 @@ public static function close(int $timeoutMs = 10000): void * @return void * @throws ParameterException */ - public static function publish( + public function publish( string $topic, mixed $body, int $schemaType = self::SCHEMALESS, @@ -283,9 +284,9 @@ public static function publish( int $partition = 0, int $flushTimeoutMs = 10000 ): void { - self::publishOn($topic, $schemaType, $schemaBody, $schemaKey); - self::produce($body, $key, $partition); - self::flush($flushTimeoutMs); + $this->publishOn($topic, $schemaType, $schemaBody, $schemaKey); + $this->produce($body, $key, $partition); + $this->flush($flushTimeoutMs); } /** @@ -294,7 +295,7 @@ public static function publish( * @return void * @throws ParameterException */ - private static function validateEnvironment(): void + private function validateEnvironment(): void { $required = [ 'KAFKA_BROKERS_URL' => 'Kafka brokers URL', @@ -316,7 +317,7 @@ private static function validateEnvironment(): void * * @return KafkaProducerBuilder */ - private static function createProducerBuilder(array $additionalConfig): KafkaProducerBuilder + private function createProducerBuilder(array $additionalConfig): KafkaProducerBuilder { $defaultConfig = [ 'compression.codec' => 'lz4', @@ -333,9 +334,9 @@ private static function createProducerBuilder(array $additionalConfig): KafkaPro return KafkaProducerBuilder::create() ->withAdditionalConfig($config) ->withAdditionalBroker(env('KAFKA_BROKERS_URL')) - ->withDeliveryReportCallback([self::class, 'deliveryReportCallback']) - ->withErrorCallback([self::class, 'errorCallback']) - ->withLogCallback([self::class, 'logCallback']); + ->withDeliveryReportCallback([$this, 'deliveryReportCallback']) + ->withErrorCallback([$this, 'errorCallback']) + ->withLogCallback([$this, 'logCallback']); } /** @@ -348,7 +349,7 @@ private static function createProducerBuilder(array $additionalConfig): KafkaPro * @return AvroEncoder * @throws ParameterException */ - private static function createAvroEncoder( + private function createAvroEncoder( string $topic, ?string $schemaBody, ?string $schemaKey @@ -411,9 +412,9 @@ private static function createAvroEncoder( * @return void * @throws ParameterException */ - private static function ensureProducerInitialized(): void + private function ensureProducerInitialized(): void { - if (self::$producer === null) { + if ($this->producer === null) { throw new ParameterException('Producer not initialized. Call publishOn() first.'); } } @@ -426,7 +427,7 @@ private static function ensureProducerInitialized(): void * * @return void */ - public static function deliveryReportCallback(mixed $kafka, mixed $message): void + public function deliveryReportCallback(mixed $kafka, mixed $message): void { if ($message->err !== 0) { Log::runtime()->error( @@ -449,7 +450,7 @@ public static function deliveryReportCallback(mixed $kafka, mixed $message): voi * * @return void */ - public static function errorCallback(mixed $kafka, int $err, string $reason): void + public function errorCallback(mixed $kafka, int $err, string $reason): void { Log::runtime()->error( [ @@ -470,7 +471,7 @@ public static function errorCallback(mixed $kafka, int $err, string $reason): vo * * @return void */ - public static function logCallback(mixed $kafka, int $level, string $facility, string $message): void + public function logCallback(mixed $kafka, int $level, string $facility, string $message): void { if ($level <= 3) { Log::runtime()->warning( @@ -496,27 +497,27 @@ public static function logCallback(mixed $kafka, int $level, string $facility, s * @throws ParameterException * @throws \AvroIOException */ - public static function consumeOn( + public function consumeOn( string $topic, int $schemaType, ?string $consumerGroup = null, array $additionalConfig = [] ): KafkaConsumerInterface { - self::validateConsumerEnvironment(); - self::$currentTopic = $topic; + $this->validateConsumerEnvironment(); + $this->currentTopic = $topic; - $consumerBuilder = self::createConsumerBuilder($topic, $consumerGroup, $additionalConfig); + $consumerBuilder = $this->createConsumerBuilder($topic, $consumerGroup, $additionalConfig); if ($schemaType === self::AVRO_SCHEMA) { - $decoder = self::createAvroDecoder($topic); + $decoder = $this->createAvroDecoder($topic); $consumerBuilder->withDecoder($decoder); } elseif ($schemaType === self::JSON_SCHEMA) { $decoder = new JsonDecoder(); $consumerBuilder->withDecoder($decoder); } - self::$consumer = $consumerBuilder->build(); - self::$consumer->subscribe(); + $this->consumer = $consumerBuilder->build(); + $this->consumer->subscribe(); Log::runtime()->info( [ @@ -527,7 +528,7 @@ public static function consumeOn( ] ); - return self::$consumer; + return $this->consumer; } /** @@ -538,13 +539,13 @@ public static function consumeOn( * @return KafkaConsumerMessageInterface * @throws ParameterException */ - public static function consume(int $timeoutMs = 10000): KafkaConsumerMessageInterface + public function consume(int $timeoutMs = 10000): KafkaConsumerMessageInterface { - if (self::$consumer === null) { + if ($this->consumer === null) { throw new ParameterException('Consumer not initialized. Call consumeOn() first.'); } - return self::$consumer->consume($timeoutMs); + return $this->consumer->consume($timeoutMs); } /** @@ -555,13 +556,13 @@ public static function consume(int $timeoutMs = 10000): KafkaConsumerMessageInte * @return void * @throws ParameterException */ - public static function commit(mixed $message): void + public function commit(mixed $message): void { - if (self::$consumer === null) { + if ($this->consumer === null) { throw new ParameterException('Consumer not initialized. Call consumeOn() first.'); } - self::$consumer->commit($message); + $this->consumer->commit($message); } /** @@ -570,7 +571,7 @@ public static function commit(mixed $message): void * @return void * @throws ParameterException */ - private static function validateConsumerEnvironment(): void + private function validateConsumerEnvironment(): void { $required = [ 'KAFKA_BROKERS_URL' => 'Kafka brokers URL', @@ -591,18 +592,18 @@ private static function validateConsumerEnvironment(): void * @return void * @throws ParameterException */ - public static function subscribe(): void + public function subscribe(): void { - if (self::$consumer === null) { + if ($this->consumer === null) { throw new ParameterException('Consumer not initialized. Call consumeOn() first.'); } - self::$consumer->subscribe(); + $this->consumer->subscribe(); Log::runtime()->info( [ 'operation' => 'kafka_consumer_subscribed', - 'topic' => self::$currentTopic + 'topic' => $this->currentTopic ] ); } @@ -612,12 +613,12 @@ public static function subscribe(): void * * @return void */ - public static function closeConsumer(): void + public function closeConsumer(): void { - if (self::$consumer !== null) { - self::$consumer->unsubscribe(); - self::$consumer = null; - self::$currentTopic = null; + if ($this->consumer !== null) { + $this->consumer->unsubscribe(); + $this->consumer = null; + $this->currentTopic = null; Log::runtime()->info( [ @@ -636,7 +637,7 @@ public static function closeConsumer(): void * * @return KafkaConsumerBuilderInterface */ - private static function createConsumerBuilder( + private function createConsumerBuilder( string $topic, ?string $consumerGroup, array $additionalConfig @@ -663,11 +664,11 @@ private static function createConsumerBuilder( ->withAdditionalBroker((string) env('KAFKA_BROKERS_URL')) ->withConsumerGroup($groupName) ->withAdditionalSubscription($topic, [], KafkaConsumerBuilderInterface::OFFSET_STORED) - ->withErrorCallback([self::class, 'errorCallback']) - ->withRebalanceCallback([self::class, 'rebalanceCallback']) - ->withConsumeCallback([self::class, 'consumeCallback']) - ->withLogCallback([self::class, 'logCallback']) - ->withOffsetCommitCallback([self::class, 'offsetCommitCallback']); + ->withErrorCallback([$this, 'errorCallback']) + ->withRebalanceCallback([$this, 'rebalanceCallback']) + ->withConsumeCallback([$this, 'consumeCallback']) + ->withLogCallback([$this, 'logCallback']) + ->withOffsetCommitCallback([$this, 'offsetCommitCallback']); } /** @@ -679,7 +680,7 @@ private static function createConsumerBuilder( * @throws ParameterException * @throws \AvroIOException */ - private static function createAvroDecoder(string $topic): AvroDecoder + private function createAvroDecoder(string $topic): AvroDecoder { if (empty(env('KAFKA_SCHEMA_REGISTRY_URL'))) { throw new ParameterException('Environment variable KAFKA_SCHEMA_REGISTRY_URL is not set'); From 2bb2fd792bbfbac9993077994e0b34c417201e36 Mon Sep 17 00:00:00 2001 From: Mufthi Ryanda <77824812+mufthiryanda@users.noreply.github.com> Date: Thu, 26 Feb 2026 10:06:20 +0700 Subject: [PATCH 06/11] fix : fix env --- src/Libraries/Kafka.php | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/Libraries/Kafka.php b/src/Libraries/Kafka.php index e1b1a8d..1249d36 100644 --- a/src/Libraries/Kafka.php +++ b/src/Libraries/Kafka.php @@ -354,8 +354,8 @@ private function createAvroEncoder( ?string $schemaBody, ?string $schemaKey ): AvroEncoder { - if (empty(env('KAFKA_SCHEMA_REGISTRY_URL'))) { - throw new ParameterException('Environment variable KAFKA_SCHEMA_REGISTRY_URL is not set'); + if (empty(env('KAFKA_SCHEME_REGISTRY_URL'))) { + throw new ParameterException('Environment variable KAFKA_SCHEME_REGISTRY_URL is not set'); } $cachedRegistry = new CachedRegistry( @@ -363,7 +363,7 @@ private function createAvroEncoder( new PromisingRegistry( new GuzzleClient( [ - 'base_uri' => env('KAFKA_SCHEMA_REGISTRY_URL'), + 'base_uri' => env('KAFKA_SCHEME_REGISTRY_URL'), 'auth' => [env('KAFKA_USER_PRODUCE'), env('KAFKA_PASS_PRODUCE')] ] ) @@ -682,8 +682,8 @@ private function createConsumerBuilder( */ private function createAvroDecoder(string $topic): AvroDecoder { - if (empty(env('KAFKA_SCHEMA_REGISTRY_URL'))) { - throw new ParameterException('Environment variable KAFKA_SCHEMA_REGISTRY_URL is not set'); + if (empty(env('KAFKA_SCHEME_REGISTRY_URL'))) { + throw new ParameterException('Environment variable KAFKA_SCHEME_REGISTRY_URL is not set'); } $cachedRegistry = new CachedRegistry( @@ -691,7 +691,7 @@ private function createAvroDecoder(string $topic): AvroDecoder new PromisingRegistry( new GuzzleClient( [ - 'base_uri' => env('KAFKA_SCHEMA_REGISTRY_URL'), + 'base_uri' => env('KAFKA_SCHEME_REGISTRY_URL'), 'auth' => [env('KAFKA_USER_CONSUME'), env('KAFKA_PASS_CONSUME')] ] ) From 8faaaca906eb73a1fd2930087572e52b162b0baf Mon Sep 17 00:00:00 2001 From: Mufthi Ryanda <77824812+mufthiryanda@users.noreply.github.com> Date: Mon, 9 Mar 2026 14:32:21 +0700 Subject: [PATCH 07/11] feat : add auto decoder --- src/Libraries/Kafka.php | 86 ++++++++++++++++++++++++++---- src/Libraries/KafkaAutoDecoder.php | 70 ++++++++++++++++++++++++ 2 files changed, 147 insertions(+), 9 deletions(-) create mode 100644 src/Libraries/KafkaAutoDecoder.php diff --git a/src/Libraries/Kafka.php b/src/Libraries/Kafka.php index 1249d36..2c6c0f0 100644 --- a/src/Libraries/Kafka.php +++ b/src/Libraries/Kafka.php @@ -68,6 +68,12 @@ class Kafka */ public const AVRO_SCHEMA = 3; + /** + * Schema type: Auto-detect -> Avro if magic byte (\x00) present, else raw + */ + public const AUTO_SCHEMA = 4; + + /** * Current producer instance * @@ -114,10 +120,10 @@ public function publishOn( if ($schemaType === self::AVRO_SCHEMA) { $encoder = $this->createAvroEncoder($topic, $schemaBody, $schemaKey); - $producerBuilder->withEncoder($encoder); + $producerBuilder = $producerBuilder->withEncoder($encoder); } elseif ($schemaType === self::JSON_SCHEMA) { $encoder = new JsonEncoder(); - $producerBuilder->withEncoder($encoder); + $producerBuilder = $producerBuilder->withEncoder($encoder); } $this->producer = $producerBuilder->build(); @@ -510,10 +516,12 @@ public function consumeOn( if ($schemaType === self::AVRO_SCHEMA) { $decoder = $this->createAvroDecoder($topic); - $consumerBuilder->withDecoder($decoder); + $consumerBuilder = $consumerBuilder->withDecoder($decoder); } elseif ($schemaType === self::JSON_SCHEMA) { - $decoder = new JsonDecoder(); - $consumerBuilder->withDecoder($decoder); + $consumerBuilder = $consumerBuilder->withDecoder(new JsonDecoder()); + } elseif ($schemaType === self::AUTO_SCHEMA) { + $avroDecoder = $this->createAvroDecoder($topic); + $consumerBuilder = $consumerBuilder->withDecoder(new KafkaAutoDecoder($avroDecoder)); } $this->consumer = $consumerBuilder->build(); @@ -534,15 +542,19 @@ public function consumeOn( /** * Consume single message * - * @param int $timeoutMs Timeout in milliseconds + * @param int $timeoutMs Timeout in milliseconds + * @param ?string $topic Topic * * @return KafkaConsumerMessageInterface - * @throws ParameterException */ - public function consume(int $timeoutMs = 10000): KafkaConsumerMessageInterface + public function consume(int $timeoutMs = 10000, ?string $topic = null): KafkaConsumerMessageInterface { if ($this->consumer === null) { - throw new ParameterException('Consumer not initialized. Call consumeOn() first.'); + if ($topic === null) { + throw new ParameterException('Consumer not initialized. Call consumeOn() first or provide a topic.'); + } + // auto-bind with AUTO_SCHEMA + $this->consumeOn($topic, self::AUTO_SCHEMA); } return $this->consumer->consume($timeoutMs); @@ -721,4 +733,60 @@ private function createAvroDecoder(string $topic): AvroDecoder return new AvroDecoder($registry, $recordSerializer); } + + /** + * Rebalance callback + * + * @param mixed $kafka Kafka instance + * @param int $err Error code + * @param array $partitions Partitions + * + * @return void + */ + public function rebalanceCallback(mixed $kafka, int $err, array $partitions): void + { + $assignPartitions = defined('RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS') + ? RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS // Defined on C-level constant, comes from the `ext-rdkafka` + : -175; + + if ($err === $assignPartitions) { + $kafka->assign($partitions); + } else { + $kafka->assign(null); + } + } + + /** + * Consume callback + * + * @param mixed $message Message + * @param mixed $ctx Context + * + * @return void + */ + public function consumeCallback(mixed $message, mixed $ctx): void + { + // no-op: messages are handled by the caller via consume() + } + + /** + * Offset commit callback + * + * @param mixed $kafka Kafka instance + * @param int $err Error code + * @param array $topicPartitions Topic partitions + * + * @return void + */ + public function offsetCommitCallback(mixed $kafka, int $err, array $topicPartitions): void + { + if ($err !== 0) { + Log::runtime()->error( + [ + 'operation' => 'kafka_offset_commit_failed', + 'errorCode' => $err + ] + ); + } + } } diff --git a/src/Libraries/KafkaAutoDecoder.php b/src/Libraries/KafkaAutoDecoder.php new file mode 100644 index 0000000..abda022 --- /dev/null +++ b/src/Libraries/KafkaAutoDecoder.php @@ -0,0 +1,70 @@ + + * @license https://mit-license.org/ MIT License + * @version GIT: 0.4.0 + * @link https://github.com/spotlibs + */ + +declare(strict_types=1); + +namespace Spotlibs\PhpLib\Libraries; + +use Jobcloud\Kafka\Message\Decoder\AvroDecoder; +use Jobcloud\Kafka\Message\Decoder\DecoderInterface; +use Jobcloud\Kafka\Message\KafkaConsumerMessageInterface; + +/** + * KafkaAutoDecoder + * + * Smart decoder that auto-detects Avro wire format (magic byte 0x00) and + * falls back to raw/schemaless if the byte is absent or decoding fails. + * + * @category Library + * @package Libraries + * @author Mufthi Ryanda + * @license https://mit-license.org/ MIT License + * @link https://github.com/spotlibs + */ +class KafkaAutoDecoder implements DecoderInterface +{ + /** + * KafkaAutoDecoder constructor + * + * @param AvroDecoder $avroDecoder Avro decoder instance backed by schema registry + */ + public function __construct(private AvroDecoder $avroDecoder) + { + } + + /** + * Decode a Kafka consumer message + * + * Attempts Avro decoding when the message body starts with the Avro magic + * byte (0x00). Falls back to returning the raw message as-is for schemaless + * messages or when Avro decoding fails. + * + * @param KafkaConsumerMessageInterface $consumerMessage Incoming Kafka message + * + * @return KafkaConsumerMessageInterface + */ + public function decode(KafkaConsumerMessageInterface $consumerMessage): KafkaConsumerMessageInterface + { + $body = $consumerMessage->getBody(); + + if (is_string($body) && strlen($body) > 5 && ord($body[0]) === 0) { + try { + return $this->avroDecoder->decode($consumerMessage); + } catch (\Throwable) { + // fallback to raw schemaless + } + } + + return $consumerMessage; + } +} From a6394adb28db82aeae4cddcfe8708df0d9ee2147 Mon Sep 17 00:00:00 2001 From: Mufthi Ryanda <77824812+mufthiryanda@users.noreply.github.com> Date: Fri, 27 Mar 2026 14:45:40 +0700 Subject: [PATCH 08/11] feat : add completed kafka --- src/Libraries/{ => Kafka}/Kafka.php | 2 +- .../{ => Kafka}/KafkaAutoDecoder.php | 24 +++- src/Libraries/Kafka/KafkaDecodedMessage.php | 116 ++++++++++++++++++ 3 files changed, 135 insertions(+), 7 deletions(-) rename src/Libraries/{ => Kafka}/Kafka.php (99%) rename src/Libraries/{ => Kafka}/KafkaAutoDecoder.php (69%) create mode 100644 src/Libraries/Kafka/KafkaDecodedMessage.php diff --git a/src/Libraries/Kafka.php b/src/Libraries/Kafka/Kafka.php similarity index 99% rename from src/Libraries/Kafka.php rename to src/Libraries/Kafka/Kafka.php index 2c6c0f0..d20834d 100644 --- a/src/Libraries/Kafka.php +++ b/src/Libraries/Kafka/Kafka.php @@ -13,7 +13,7 @@ declare(strict_types=1); -namespace Spotlibs\PhpLib\Libraries; +namespace Spotlibs\PhpLib\Libraries\Kafka; use AvroSchema; use FlixTech\AvroSerializer\Objects\RecordSerializer; diff --git a/src/Libraries/KafkaAutoDecoder.php b/src/Libraries/Kafka/KafkaAutoDecoder.php similarity index 69% rename from src/Libraries/KafkaAutoDecoder.php rename to src/Libraries/Kafka/KafkaAutoDecoder.php index abda022..c4f89b8 100644 --- a/src/Libraries/KafkaAutoDecoder.php +++ b/src/Libraries/Kafka/KafkaAutoDecoder.php @@ -13,7 +13,7 @@ declare(strict_types=1); -namespace Spotlibs\PhpLib\Libraries; +namespace Spotlibs\PhpLib\Libraries\Kafka; use Jobcloud\Kafka\Message\Decoder\AvroDecoder; use Jobcloud\Kafka\Message\Decoder\DecoderInterface; @@ -57,11 +57,23 @@ public function decode(KafkaConsumerMessageInterface $consumerMessage): KafkaCon { $body = $consumerMessage->getBody(); - if (is_string($body) && strlen($body) > 5 && ord($body[0]) === 0) { - try { - return $this->avroDecoder->decode($consumerMessage); - } catch (\Throwable) { - // fallback to raw schemaless + if (is_string($body) && strlen($body) > 5) { + // Avro: magic byte 0x00 + if (ord($body[0]) === 0) { + try { + return $this->avroDecoder->decode($consumerMessage); + } catch (\Throwable) { + // fallback to raw + } + } + + // JSON: starts with { or [ + $firstChar = ltrim($body)[0] ?? ''; + if ($firstChar === '{' || $firstChar === '[') { + $decoded = json_decode($body, true); + if (json_last_error() === JSON_ERROR_NONE) { + return new KafkaDecodedMessage($consumerMessage, $decoded); + } } } diff --git a/src/Libraries/Kafka/KafkaDecodedMessage.php b/src/Libraries/Kafka/KafkaDecodedMessage.php new file mode 100644 index 0000000..6288852 --- /dev/null +++ b/src/Libraries/Kafka/KafkaDecodedMessage.php @@ -0,0 +1,116 @@ + + * @license https://mit-license.org/ MIT License + * @version GIT: 0.4.0 + * @link https://github.com/spotlibs + */ + +declare(strict_types=1); + +namespace Spotlibs\PhpLib\Libraries\Kafka; + +use Jobcloud\Kafka\Message\KafkaConsumerMessageInterface; + +/** + * KafkaDecodedMessage + * + * Wraps a raw KafkaConsumerMessage and replaces its body with an already-decoded + * value (e.g. an associative array from JSON). All other accessors are delegated + * to the original message to preserve topic, partition, offset, and header info. + * + * @category Library + * @package Libraries + * @author Mufthi Ryanda + * @license https://mit-license.org/ MIT License + * @link https://github.com/spotlibs + */ +class KafkaDecodedMessage implements KafkaConsumerMessageInterface +{ + /** + * KafkaDecodedMessage constructor + * + * @param KafkaConsumerMessageInterface $original Original raw Kafka message + * @param mixed $decodedBody Pre-decoded message body + */ + public function __construct( + private KafkaConsumerMessageInterface $original, + private mixed $decodedBody + ) { + } + + /** + * Get the decoded message body + * + * @return mixed + */ + public function getBody(): mixed + { + return $this->decodedBody; + } + + /** + * Get the message key + * + * @return string|null + */ + public function getKey(): ?string + { + return $this->original->getKey(); + } + + /** + * Get the topic name + * + * @return string + */ + public function getTopicName(): string + { + return $this->original->getTopicName(); + } + + /** + * Get the partition number + * + * @return int + */ + public function getPartition(): int + { + return $this->original->getPartition(); + } + + /** + * Get the message headers + * + * @return array + */ + public function getHeaders(): array + { + return $this->original->getHeaders(); + } + + /** + * Get the message offset + * + * @return int + */ + public function getOffset(): int + { + return $this->original->getOffset(); + } + + /** + * Get the message timestamp + * + * @return int + */ + public function getTimestamp(): int + { + return $this->original->getTimestamp(); + } +} From 3a3ca67d6eebbc07501d25f15ed1ef7655c4801d Mon Sep 17 00:00:00 2001 From: Mufthi Ryanda <77824812+mufthiryanda@users.noreply.github.com> Date: Wed, 20 May 2026 15:19:10 +0700 Subject: [PATCH 09/11] feat : add json scheme support --- src/Libraries/Kafka/JsonSchemaDecoder.php | 106 +++++++++++ src/Libraries/Kafka/JsonSchemaEncoder.php | 211 +++++++++++++++++++++ src/Libraries/Kafka/JsonSchemaRegistry.php | 139 ++++++++++++++ src/Libraries/Kafka/Kafka.php | 76 +++++++- src/Libraries/Kafka/KafkaAutoDecoder.php | 43 ++++- src/Libraries/Kafka/KafkaJsonSchema.php | 92 +++++++++ 6 files changed, 652 insertions(+), 15 deletions(-) create mode 100644 src/Libraries/Kafka/JsonSchemaDecoder.php create mode 100644 src/Libraries/Kafka/JsonSchemaEncoder.php create mode 100644 src/Libraries/Kafka/JsonSchemaRegistry.php create mode 100644 src/Libraries/Kafka/KafkaJsonSchema.php diff --git a/src/Libraries/Kafka/JsonSchemaDecoder.php b/src/Libraries/Kafka/JsonSchemaDecoder.php new file mode 100644 index 0000000..0b88e5c --- /dev/null +++ b/src/Libraries/Kafka/JsonSchemaDecoder.php @@ -0,0 +1,106 @@ + + * @license https://mit-license.org/ MIT License + * @version GIT: 0.4.0 + * @link https://github.com/spotlibs + */ + +declare(strict_types=1); + +namespace Spotlibs\PhpLib\Libraries\Kafka; + +use Jobcloud\Kafka\Message\Decoder\DecoderInterface; +use Jobcloud\Kafka\Message\KafkaConsumerMessageInterface; +use Spotlibs\PhpLib\Exceptions\ParameterException; + +/** + * JsonSchemaDecoder + * + * Decodes Kafka consumer messages encoded with JSON Schema Confluent wire format. + * Strips the wire format header (magic byte + schema ID), fetches the schema from + * the registry, validates the JSON payload, and returns the decoded array. + * + * @category Library + * @package Libraries + * @author Mufthi Ryanda + * @license https://mit-license.org/ MIT License + * @link https://github.com/spotlibs + */ +class JsonSchemaDecoder implements DecoderInterface +{ + /** + * JsonSchemaDecoder constructor + * + * @param JsonSchemaRegistry $registry Schema registry client + */ + public function __construct(private JsonSchemaRegistry $registry) + { + } + + /** + * Decode a Kafka consumer message with JSON Schema wire format + * + * @param KafkaConsumerMessageInterface $consumerMessage Incoming Kafka message + * + * @return KafkaConsumerMessageInterface Decoded message with array body + * @throws ParameterException + */ + public function decode(KafkaConsumerMessageInterface $consumerMessage): KafkaConsumerMessageInterface + { + $body = $consumerMessage->getBody(); + + if (!is_string($body) || strlen($body) <= 5) { + return $consumerMessage; + } + + if (ord($body[0]) !== 0) { + return $consumerMessage; + } + + $schemaId = unpack('N', substr($body, 1, 4))[1]; + $jsonPayload = substr($body, 5); + + // Fetch schema for validation + $this->registry->getSchemaById($schemaId); + + $decoded = json_decode($jsonPayload, true); + + if (json_last_error() !== JSON_ERROR_NONE) { + throw new ParameterException('JSON Schema decode failed: invalid JSON payload'); + } + + return new KafkaDecodedMessage($consumerMessage, $decoded); + } + + /** + * Attempt to decode a wire-format message as JSON Schema + * + * Returns null if the payload after the header is not valid JSON, + * indicating it may be Avro-encoded instead. + * + * @param string $body Raw message body bytes + * + * @return array|null Decoded array or null if not valid JSON + */ + public static function tryDecode(string $body): ?array + { + if (strlen($body) <= 5 || ord($body[0]) !== 0) { + return null; + } + + $jsonPayload = substr($body, 5); + $decoded = json_decode($jsonPayload, true); + + if (json_last_error() !== JSON_ERROR_NONE) { + return null; + } + + return $decoded; + } +} diff --git a/src/Libraries/Kafka/JsonSchemaEncoder.php b/src/Libraries/Kafka/JsonSchemaEncoder.php new file mode 100644 index 0000000..bc15529 --- /dev/null +++ b/src/Libraries/Kafka/JsonSchemaEncoder.php @@ -0,0 +1,211 @@ + + * @license https://mit-license.org/ MIT License + * @version GIT: 0.4.0 + * @link https://github.com/spotlibs + */ + +declare(strict_types=1); + +namespace Spotlibs\PhpLib\Libraries\Kafka; + +use Jobcloud\Kafka\Message\Encoder\EncoderInterface; +use Jobcloud\Kafka\Message\KafkaProducerMessageInterface; +use Spotlibs\PhpLib\Exceptions\ParameterException; + +/** + * JsonSchemaEncoder + * + * Encodes Kafka producer messages using JSON Schema with Confluent wire format. + * Registers the schema with the Schema Registry, validates the payload, and + * prepends the wire format header (magic byte + schema ID) to the JSON payload. + * + * @category Library + * @package Libraries + * @author Mufthi Ryanda + * @license https://mit-license.org/ MIT License + * @link https://github.com/spotlibs + */ +class JsonSchemaEncoder implements EncoderInterface +{ + /** + * Confluent wire format magic byte + */ + private const MAGIC_BYTE = "\x00"; + + /** + * JsonSchemaEncoder constructor + * + * @param JsonSchemaRegistry $registry Schema registry client + * @param KafkaJsonSchema $bodySchema Body schema definition + * @param KafkaJsonSchema|null $keySchema Key schema definition (optional) + */ + public function __construct( + private JsonSchemaRegistry $registry, + private KafkaJsonSchema $bodySchema, + private ?KafkaJsonSchema $keySchema = null + ) { + } + + /** + * Encode a Kafka producer message with JSON Schema wire format + * + * @param KafkaProducerMessageInterface $producerMessage Message to encode + * + * @return KafkaProducerMessageInterface Encoded message with wire format body + * @throws ParameterException + */ + public function encode(KafkaProducerMessageInterface $producerMessage): KafkaProducerMessageInterface + { + $body = $producerMessage->getBody(); + $encodedBody = $this->encodePayload($body, $this->bodySchema); + $producerMessage = $producerMessage->withBody($encodedBody); + + if ($this->keySchema !== null && $producerMessage->getKey() !== null) { + $key = $producerMessage->getKey(); + $keyPayload = is_string($key) ? $key : json_encode($key); + $encodedKey = $this->encodeRawPayload($keyPayload, $this->keySchema); + $producerMessage = $producerMessage->withKey($encodedKey); + } + + return $producerMessage; + } + + /** + * Encode a payload value with schema validation and wire format + * + * @param mixed $payload Payload data (array or string) + * @param KafkaJsonSchema $schema Schema to validate and encode against + * + * @return string Wire-format encoded string + * @throws ParameterException + */ + private function encodePayload(mixed $payload, KafkaJsonSchema $schema): string + { + $jsonString = is_string($payload) ? $payload : json_encode($payload); + + $this->validatePayload($jsonString, $schema->getDefinition()); + + return $this->encodeRawPayload($jsonString, $schema); + } + + /** + * Prepend Confluent wire format header to a JSON payload + * + * @param string $jsonPayload JSON string payload + * @param KafkaJsonSchema $schema Schema (must have resolved ID) + * + * @return string Wire-format bytes + * @throws ParameterException + */ + private function encodeRawPayload(string $jsonPayload, KafkaJsonSchema $schema): string + { + $schemaId = $schema->getSchemaId(); + + if ($schemaId === null) { + $schemaId = $this->registry->register($schema->getSubject(), $schema->getDefinition()); + $schema->setSchemaId($schemaId); + } + + return self::MAGIC_BYTE . pack('N', $schemaId) . $jsonPayload; + } + + /** + * Validate a JSON payload against a JSON Schema definition + * + * @param string $jsonPayload JSON string to validate + * @param string $schemaDefinition JSON Schema definition string + * + * @return void + * @throws ParameterException + */ + private function validatePayload(string $jsonPayload, string $schemaDefinition): void + { + $data = json_decode($jsonPayload); + $schema = json_decode($schemaDefinition); + + if ($schema === null) { + throw new ParameterException('Invalid JSON Schema definition'); + } + + if ($data === null && $jsonPayload !== 'null') { + throw new ParameterException('Invalid JSON payload'); + } + + // Validate required fields if schema defines them + if (isset($schema->required) && is_array($schema->required) && is_object($data)) { + foreach ($schema->required as $field) { + if (!property_exists($data, $field)) { + throw new ParameterException("JSON Schema validation failed: missing required field '{$field}'"); + } + } + } + + // Validate property types if schema defines them + if (isset($schema->properties) && is_object($data)) { + foreach ($schema->properties as $prop => $propSchema) { + if (!property_exists($data, $prop)) { + continue; + } + $this->validateType($data->$prop, $propSchema, $prop); + } + } + } + + /** + * Validate a value against its declared JSON Schema type + * + * @param mixed $value Value to check + * @param object $propSchema Property schema definition + * @param string $field Field name for error messages + * + * @return void + * @throws ParameterException + */ + private function validateType(mixed $value, object $propSchema, string $field): void + { + if (!isset($propSchema->type)) { + return; + } + + $types = is_array($propSchema->type) ? $propSchema->type : [$propSchema->type]; + + foreach ($types as $type) { + if ($this->matchesType($value, $type)) { + return; + } + } + + throw new ParameterException( + "JSON Schema validation failed: field '{$field}' does not match type(s) " . json_encode($propSchema->type) + ); + } + + /** + * Check if a value matches a JSON Schema type + * + * @param mixed $value Value to check + * @param string $type JSON Schema type name + * + * @return bool + */ + private function matchesType(mixed $value, string $type): bool + { + return match ($type) { + 'string' => is_string($value), + 'integer' => is_int($value), + 'number' => is_int($value) || is_float($value), + 'boolean' => is_bool($value), + 'array' => is_array($value), + 'object' => is_object($value), + 'null' => $value === null, + default => true, + }; + } +} diff --git a/src/Libraries/Kafka/JsonSchemaRegistry.php b/src/Libraries/Kafka/JsonSchemaRegistry.php new file mode 100644 index 0000000..f14c1e6 --- /dev/null +++ b/src/Libraries/Kafka/JsonSchemaRegistry.php @@ -0,0 +1,139 @@ + + * @license https://mit-license.org/ MIT License + * @version GIT: 0.4.0 + * @link https://github.com/spotlibs + */ + +declare(strict_types=1); + +namespace Spotlibs\PhpLib\Libraries\Kafka; + +use GuzzleHttp\Client as GuzzleClient; +use Spotlibs\PhpLib\Exceptions\ParameterException; + +/** + * JsonSchemaRegistry + * + * Manages JSON Schema registration and retrieval from Confluent Schema Registry. + * Handles the REST API calls for registering schemas under subjects and fetching + * schemas by ID. + * + * @category Library + * @package Libraries + * @author Mufthi Ryanda + * @license https://mit-license.org/ MIT License + * @link https://github.com/spotlibs + */ +class JsonSchemaRegistry +{ + /** + * In-memory cache of schema ID → schema definition + * + * @var array + */ + private array $schemaCache = []; + + /** + * JsonSchemaRegistry constructor + * + * @param GuzzleClient $client Guzzle HTTP client configured with base_uri and auth + */ + public function __construct(private GuzzleClient $client) + { + } + + /** + * Register a JSON Schema under a subject and return the schema ID + * + * @param string $subject Subject name (e.g. "topic-value") + * @param string $definition JSON Schema definition string + * + * @return int The schema ID assigned by the registry + * @throws ParameterException + */ + public function register(string $subject, string $definition): int + { + $payload = json_encode( + [ + 'schemaType' => 'JSON', + 'schema' => $definition, + ] + ); + + try { + $response = $this->client->post( + sprintf('subjects/%s/versions', $subject), + [ + 'headers' => [ + 'Content-Type' => 'application/vnd.schemaregistry.v1+json', + 'Accept' => 'application/vnd.schemaregistry.v1+json', + ], + 'body' => $payload, + ] + ); + + $body = json_decode($response->getBody()->getContents(), true); + + if (!isset($body['id'])) { + throw new ParameterException('Schema Registry did not return a schema ID'); + } + + $schemaId = (int) $body['id']; + $this->schemaCache[$schemaId] = $definition; + + return $schemaId; + } catch (ParameterException $e) { + throw $e; + } catch (\Throwable $e) { + throw new ParameterException('Failed to register JSON Schema: ' . $e->getMessage()); + } + } + + /** + * Fetch a JSON Schema definition by its schema ID + * + * @param int $schemaId Schema ID + * + * @return string JSON Schema definition string + * @throws ParameterException + */ + public function getSchemaById(int $schemaId): string + { + if (isset($this->schemaCache[$schemaId])) { + return $this->schemaCache[$schemaId]; + } + + try { + $response = $this->client->get( + sprintf('schemas/ids/%d', $schemaId), + [ + 'headers' => [ + 'Accept' => 'application/vnd.schemaregistry.v1+json', + ], + ] + ); + + $body = json_decode($response->getBody()->getContents(), true); + + if (!isset($body['schema'])) { + throw new ParameterException("Schema not found for ID {$schemaId}"); + } + + $definition = $body['schema']; + $this->schemaCache[$schemaId] = $definition; + + return $definition; + } catch (ParameterException $e) { + throw $e; + } catch (\Throwable $e) { + throw new ParameterException('Failed to fetch JSON Schema: ' . $e->getMessage()); + } + } +} diff --git a/src/Libraries/Kafka/Kafka.php b/src/Libraries/Kafka/Kafka.php index d20834d..529e4c0 100644 --- a/src/Libraries/Kafka/Kafka.php +++ b/src/Libraries/Kafka/Kafka.php @@ -26,9 +26,7 @@ use Jobcloud\Kafka\Consumer\KafkaConsumerBuilderInterface; use Jobcloud\Kafka\Consumer\KafkaConsumerInterface; use Jobcloud\Kafka\Message\Decoder\AvroDecoder; -use Jobcloud\Kafka\Message\Decoder\JsonDecoder; use Jobcloud\Kafka\Message\Encoder\AvroEncoder; -use Jobcloud\Kafka\Message\Encoder\JsonEncoder; use Jobcloud\Kafka\Message\KafkaAvroSchema; use Jobcloud\Kafka\Message\KafkaAvroSchemaInterface; use Jobcloud\Kafka\Message\KafkaConsumerMessageInterface; @@ -122,7 +120,7 @@ public function publishOn( $encoder = $this->createAvroEncoder($topic, $schemaBody, $schemaKey); $producerBuilder = $producerBuilder->withEncoder($encoder); } elseif ($schemaType === self::JSON_SCHEMA) { - $encoder = new JsonEncoder(); + $encoder = $this->createJsonSchemaEncoder($topic, $schemaBody, $schemaKey); $producerBuilder = $producerBuilder->withEncoder($encoder); } @@ -412,6 +410,72 @@ private function createAvroEncoder( return new AvroEncoder($registry, $recordSerializer); } + /** + * Create JSON Schema encoder with schema registry + * + * @param string $topic Topic name + * @param string|null $schemaBody Body schema definition (JSON Schema string) + * @param string|null $schemaKey Key schema definition (JSON Schema string) + * + * @return JsonSchemaEncoder + * @throws ParameterException + */ + private function createJsonSchemaEncoder( + string $topic, + ?string $schemaBody, + ?string $schemaKey + ): JsonSchemaEncoder { + if (empty(env('KAFKA_SCHEME_REGISTRY_URL'))) { + throw new ParameterException('Environment variable KAFKA_SCHEME_REGISTRY_URL is not set'); + } + + if ($schemaBody === null) { + throw new ParameterException('JSON Schema body definition is required for JSON_SCHEMA mode'); + } + + $registry = new JsonSchemaRegistry( + new GuzzleClient( + [ + 'base_uri' => env('KAFKA_SCHEME_REGISTRY_URL'), + 'auth' => [env('KAFKA_USER_PRODUCE'), env('KAFKA_PASS_PRODUCE')] + ] + ) + ); + + $bodySchema = new KafkaJsonSchema($topic . '-value', $schemaBody); + + $keySchema = null; + if ($schemaKey !== null) { + $keySchema = new KafkaJsonSchema($topic . '-key', $schemaKey); + } + + return new JsonSchemaEncoder($registry, $bodySchema, $keySchema); + } + + /** + * Create JSON Schema decoder with schema registry + * + * @return JsonSchemaDecoder + * @throws ParameterException + */ + private function createJsonSchemaDecoder(): JsonSchemaDecoder + { + if (empty(env('KAFKA_SCHEME_REGISTRY_URL'))) { + throw new ParameterException('Environment variable KAFKA_SCHEME_REGISTRY_URL is not set'); + } + + $registry = new JsonSchemaRegistry( + new GuzzleClient( + [ + 'base_uri' => env('KAFKA_SCHEME_REGISTRY_URL'), + 'auth' => [env('KAFKA_USER_CONSUME'), env('KAFKA_PASS_CONSUME')] + ] + ) + ); + + return new JsonSchemaDecoder($registry); + } + /** * Ensure producer is initialized * @@ -518,10 +582,12 @@ public function consumeOn( $decoder = $this->createAvroDecoder($topic); $consumerBuilder = $consumerBuilder->withDecoder($decoder); } elseif ($schemaType === self::JSON_SCHEMA) { - $consumerBuilder = $consumerBuilder->withDecoder(new JsonDecoder()); + $decoder = $this->createJsonSchemaDecoder(); + $consumerBuilder = $consumerBuilder->withDecoder($decoder); } elseif ($schemaType === self::AUTO_SCHEMA) { $avroDecoder = $this->createAvroDecoder($topic); - $consumerBuilder = $consumerBuilder->withDecoder(new KafkaAutoDecoder($avroDecoder)); + $jsonSchemaDecoder = $this->createJsonSchemaDecoder(); + $consumerBuilder = $consumerBuilder->withDecoder(new KafkaAutoDecoder($avroDecoder, $jsonSchemaDecoder)); } $this->consumer = $consumerBuilder->build(); diff --git a/src/Libraries/Kafka/KafkaAutoDecoder.php b/src/Libraries/Kafka/KafkaAutoDecoder.php index c4f89b8..6fe39ba 100644 --- a/src/Libraries/Kafka/KafkaAutoDecoder.php +++ b/src/Libraries/Kafka/KafkaAutoDecoder.php @@ -22,8 +22,10 @@ /** * KafkaAutoDecoder * - * Smart decoder that auto-detects Avro wire format (magic byte 0x00) and - * falls back to raw/schemaless if the byte is absent or decoding fails. + * Smart decoder that auto-detects message encoding. When the message starts with + * the Confluent wire format magic byte (0x00), it first attempts JSON Schema + * decoding (payload bytes 5+ are valid JSON), then falls back to Avro decoding. + * Messages without the magic byte are attempted as plain JSON, or returned raw. * * @category Library * @package Libraries @@ -36,18 +38,25 @@ class KafkaAutoDecoder implements DecoderInterface /** * KafkaAutoDecoder constructor * - * @param AvroDecoder $avroDecoder Avro decoder instance backed by schema registry + * @param AvroDecoder $avroDecoder Avro decoder instance backed by schema registry + * @param JsonSchemaDecoder|null $jsonSchemaDecoder JSON Schema decoder instance (optional) */ - public function __construct(private AvroDecoder $avroDecoder) - { + public function __construct( + private AvroDecoder $avroDecoder, + private ?JsonSchemaDecoder $jsonSchemaDecoder = null + ) { } /** * Decode a Kafka consumer message * - * Attempts Avro decoding when the message body starts with the Avro magic - * byte (0x00). Falls back to returning the raw message as-is for schemaless - * messages or when Avro decoding fails. + * Attempts decoding in this order for wire-format messages (magic byte 0x00): + * 1. JSON Schema (payload after header is valid JSON text) + * 2. Avro (binary payload after header) + * + * For non-wire-format messages: + * 3. Plain JSON (starts with { or [) + * 4. Raw passthrough * * @param KafkaConsumerMessageInterface $consumerMessage Incoming Kafka message * @@ -58,8 +67,22 @@ public function decode(KafkaConsumerMessageInterface $consumerMessage): KafkaCon $body = $consumerMessage->getBody(); if (is_string($body) && strlen($body) > 5) { - // Avro: magic byte 0x00 + // Confluent wire format: magic byte 0x00 if (ord($body[0]) === 0) { + // Try JSON Schema first (payload is UTF-8 JSON text) + $jsonDecoded = JsonSchemaDecoder::tryDecode($body); + if ($jsonDecoded !== null) { + if ($this->jsonSchemaDecoder !== null) { + try { + return $this->jsonSchemaDecoder->decode($consumerMessage); + } catch (\Throwable) { + // Fall through to return basic decoded + } + } + return new KafkaDecodedMessage($consumerMessage, $jsonDecoded); + } + + // Fall back to Avro (binary payload) try { return $this->avroDecoder->decode($consumerMessage); } catch (\Throwable) { @@ -67,7 +90,7 @@ public function decode(KafkaConsumerMessageInterface $consumerMessage): KafkaCon } } - // JSON: starts with { or [ + // Plain JSON: starts with { or [ $firstChar = ltrim($body)[0] ?? ''; if ($firstChar === '{' || $firstChar === '[') { $decoded = json_decode($body, true); diff --git a/src/Libraries/Kafka/KafkaJsonSchema.php b/src/Libraries/Kafka/KafkaJsonSchema.php new file mode 100644 index 0000000..9dec190 --- /dev/null +++ b/src/Libraries/Kafka/KafkaJsonSchema.php @@ -0,0 +1,92 @@ + + * @license https://mit-license.org/ MIT License + * @version GIT: 0.4.0 + * @link https://github.com/spotlibs + */ + +declare(strict_types=1); + +namespace Spotlibs\PhpLib\Libraries\Kafka; + +/** + * KafkaJsonSchema + * + * Holds a JSON Schema definition with its subject name and resolved schema ID + * from the Confluent Schema Registry. + * + * @category Library + * @package Libraries + * @author Mufthi Ryanda + * @license https://mit-license.org/ MIT License + * @link https://github.com/spotlibs + */ +class KafkaJsonSchema +{ + /** + * Schema ID from registry (null until registered/fetched) + * + * @var int|null + */ + private ?int $schemaId = null; + + /** + * KafkaJsonSchema constructor + * + * @param string $subject Subject name in the registry (e.g. "topic-value") + * @param string $definition Raw JSON Schema definition string + */ + public function __construct( + private string $subject, + private string $definition + ) { + } + + /** + * Get the subject name + * + * @return string + */ + public function getSubject(): string + { + return $this->subject; + } + + /** + * Get the JSON Schema definition string + * + * @return string + */ + public function getDefinition(): string + { + return $this->definition; + } + + /** + * Get the resolved schema ID + * + * @return int|null + */ + public function getSchemaId(): ?int + { + return $this->schemaId; + } + + /** + * Set the resolved schema ID + * + * @param int $schemaId Schema ID from registry + * + * @return void + */ + public function setSchemaId(int $schemaId): void + { + $this->schemaId = $schemaId; + } +} From 6285e0d8c2fd57c96f0ad55d497d2ac6e298428c Mon Sep 17 00:00:00 2001 From: Mufthi Ryanda <77824812+mufthiryanda@users.noreply.github.com> Date: Thu, 21 May 2026 11:12:26 +0700 Subject: [PATCH 10/11] feat : add UT Kafka Lib --- tests/Libraries/ClientExternalTest.php | 7 +- .../Libraries/Kafka/JsonSchemaDecoderTest.php | 154 +++++++++++ .../Libraries/Kafka/JsonSchemaEncoderTest.php | 259 ++++++++++++++++++ .../Kafka/JsonSchemaRegistryTest.php | 164 +++++++++++ .../Libraries/Kafka/KafkaAutoDecoderTest.php | 232 ++++++++++++++++ .../Kafka/KafkaDecodedMessageTest.php | 112 ++++++++ tests/Libraries/Kafka/KafkaJsonSchemaTest.php | 63 +++++ 7 files changed, 989 insertions(+), 2 deletions(-) create mode 100644 tests/Libraries/Kafka/JsonSchemaDecoderTest.php create mode 100644 tests/Libraries/Kafka/JsonSchemaEncoderTest.php create mode 100644 tests/Libraries/Kafka/JsonSchemaRegistryTest.php create mode 100644 tests/Libraries/Kafka/KafkaAutoDecoderTest.php create mode 100644 tests/Libraries/Kafka/KafkaDecodedMessageTest.php create mode 100644 tests/Libraries/Kafka/KafkaJsonSchemaTest.php diff --git a/tests/Libraries/ClientExternalTest.php b/tests/Libraries/ClientExternalTest.php index 53fae2f..df83f8d 100644 --- a/tests/Libraries/ClientExternalTest.php +++ b/tests/Libraries/ClientExternalTest.php @@ -11,7 +11,6 @@ use GuzzleHttp\Psr7\Request; use GuzzleHttp\Psr7\Response; use GuzzleHttp\Psr7\Utils; -use Illuminate\Support\Facades\Redis; use Laravel\Lumen\Testing\TestCase; use Spotlibs\PhpLib\Libraries\ClientExternal; use GuzzleHttp\Handler\MockHandler; @@ -46,12 +45,16 @@ public function testCallEksternal1(): void public function testCallExternalMultipartSuccess(): void { - Redis::shouldReceive('get')->andReturn(json_encode([ + $redisMock = \Mockery::mock(); + $redisMock->shouldReceive('get')->andReturn(json_encode([ 'id' => 1, 'target_url' => 'https://jsonplaceholder.typicode.com/posts', 'mock_url' => 'https://jsonplaceholder.typicode.com/posts', 'flag' => true, ])); + $this->app->singleton('redis', function () use ($redisMock) { + return $redisMock; + }); $f = fopen('public/docs/hello.txt', 'w'); fwrite($f, 'hello world'); fclose($f); diff --git a/tests/Libraries/Kafka/JsonSchemaDecoderTest.php b/tests/Libraries/Kafka/JsonSchemaDecoderTest.php new file mode 100644 index 0000000..599475f --- /dev/null +++ b/tests/Libraries/Kafka/JsonSchemaDecoderTest.php @@ -0,0 +1,154 @@ + + * @license https://mit-license.org/ MIT License + * @version GIT: 0.0.1 + * @link https://github.com/ + */ + +declare(strict_types=1); + +namespace Tests\Libraries\Kafka; + +use Jobcloud\Kafka\Message\KafkaConsumerMessageInterface; +use Mockery; +use PHPUnit\Framework\TestCase; +use Spotlibs\PhpLib\Exceptions\ParameterException; +use Spotlibs\PhpLib\Libraries\Kafka\JsonSchemaDecoder; +use Spotlibs\PhpLib\Libraries\Kafka\JsonSchemaRegistry; +use Spotlibs\PhpLib\Libraries\Kafka\KafkaDecodedMessage; + +/** + * JsonSchemaDecoderTest + * + * Unit test for JsonSchemaDecoder + * + * @category Test + * @package Tests\Libraries\Kafka + * @author Mufthi Ryanda + * @license https://mit-license.org/ MIT License + * @link https://github.com/ + * @covers \Spotlibs\PhpLib\Libraries\Kafka\JsonSchemaDecoder + * @covers \Spotlibs\PhpLib\Libraries\Kafka\KafkaDecodedMessage + * @covers \Spotlibs\PhpLib\Exceptions\ParameterException + */ +class JsonSchemaDecoderTest extends TestCase +{ + protected function tearDown(): void + { + Mockery::close(); + parent::tearDown(); + } + + /** @test */ + public function testDecodeReturnsOriginalWhenBodyNotString(): void + { + $registry = Mockery::mock(JsonSchemaRegistry::class); + $msg = Mockery::mock(KafkaConsumerMessageInterface::class); + $msg->shouldReceive('getBody')->once()->andReturn(null); + + $decoder = new JsonSchemaDecoder($registry); + $result = $decoder->decode($msg); + + $this->assertSame($msg, $result); + } + + /** @test */ + public function testDecodeReturnsOriginalWhenBodyTooShort(): void + { + $registry = Mockery::mock(JsonSchemaRegistry::class); + $msg = Mockery::mock(KafkaConsumerMessageInterface::class); + $msg->shouldReceive('getBody')->once()->andReturn('abc'); + + $decoder = new JsonSchemaDecoder($registry); + $result = $decoder->decode($msg); + + $this->assertSame($msg, $result); + } + + /** @test */ + public function testDecodeReturnsOriginalWhenNoMagicByte(): void + { + $registry = Mockery::mock(JsonSchemaRegistry::class); + $body = 'X' . pack('N', 1) . '{"a":1}'; + $msg = Mockery::mock(KafkaConsumerMessageInterface::class); + $msg->shouldReceive('getBody')->once()->andReturn($body); + + $decoder = new JsonSchemaDecoder($registry); + $result = $decoder->decode($msg); + + $this->assertSame($msg, $result); + } + + /** @test */ + public function testDecodeSuccess(): void + { + $registry = Mockery::mock(JsonSchemaRegistry::class); + $registry->shouldReceive('getSchemaById')->once()->with(1)->andReturn('{}'); + + $payload = json_encode(['name' => 'test']); + $body = "\x00" . pack('N', 1) . $payload; + + $msg = Mockery::mock(KafkaConsumerMessageInterface::class); + $msg->shouldReceive('getBody')->once()->andReturn($body); + + $decoder = new JsonSchemaDecoder($registry); + $result = $decoder->decode($msg); + + $this->assertInstanceOf(KafkaDecodedMessage::class, $result); + $this->assertSame(['name' => 'test'], $result->getBody()); + } + + /** @test */ + public function testDecodeThrowsOnInvalidJson(): void + { + $this->expectException(ParameterException::class); + + $registry = Mockery::mock(JsonSchemaRegistry::class); + $registry->shouldReceive('getSchemaById')->once()->andReturn('{}'); + + $body = "\x00" . pack('N', 1) . '{invalid json'; + + $msg = Mockery::mock(KafkaConsumerMessageInterface::class); + $msg->shouldReceive('getBody')->once()->andReturn($body); + + $decoder = new JsonSchemaDecoder($registry); + $decoder->decode($msg); + } + + /** @test */ + public function testTryDecodeReturnsNullWhenTooShort(): void + { + $result = JsonSchemaDecoder::tryDecode('abc'); + $this->assertNull($result); + } + + /** @test */ + public function testTryDecodeReturnsNullWhenNoMagicByte(): void + { + $body = 'X' . pack('N', 1) . '{"a":1}'; + $result = JsonSchemaDecoder::tryDecode($body); + $this->assertNull($result); + } + + /** @test */ + public function testTryDecodeReturnsNullOnInvalidJson(): void + { + $body = "\x00" . pack('N', 1) . '{not json'; + $result = JsonSchemaDecoder::tryDecode($body); + $this->assertNull($result); + } + + /** @test */ + public function testTryDecodeReturnsArrayOnValidJson(): void + { + $body = "\x00" . pack('N', 1) . json_encode(['foo' => 'bar']); + $result = JsonSchemaDecoder::tryDecode($body); + $this->assertSame(['foo' => 'bar'], $result); + } +} diff --git a/tests/Libraries/Kafka/JsonSchemaEncoderTest.php b/tests/Libraries/Kafka/JsonSchemaEncoderTest.php new file mode 100644 index 0000000..5f18bdf --- /dev/null +++ b/tests/Libraries/Kafka/JsonSchemaEncoderTest.php @@ -0,0 +1,259 @@ + + * @license https://mit-license.org/ MIT License + * @version GIT: 0.0.1 + * @link https://github.com/ + */ + +declare(strict_types=1); + +namespace Tests\Libraries\Kafka; + +use Jobcloud\Kafka\Message\KafkaProducerMessageInterface; +use Mockery; +use PHPUnit\Framework\TestCase; +use Spotlibs\PhpLib\Exceptions\ParameterException; +use Spotlibs\PhpLib\Libraries\Kafka\JsonSchemaEncoder; +use Spotlibs\PhpLib\Libraries\Kafka\JsonSchemaRegistry; +use Spotlibs\PhpLib\Libraries\Kafka\KafkaJsonSchema; + +/** + * JsonSchemaEncoderTest + * + * Unit test for JsonSchemaEncoder + * + * @category Test + * @package Tests\Libraries\Kafka + * @author Mufthi Ryanda + * @license https://mit-license.org/ MIT License + * @link https://github.com/ + * @covers \Spotlibs\PhpLib\Libraries\Kafka\JsonSchemaEncoder + * @covers \Spotlibs\PhpLib\Libraries\Kafka\KafkaJsonSchema + * @covers \Spotlibs\PhpLib\Exceptions\ParameterException + */ +class JsonSchemaEncoderTest extends TestCase +{ + protected function tearDown(): void + { + Mockery::close(); + parent::tearDown(); + } + + /** @test */ + public function testEncodeBodyWithRegisteredSchema(): void + { + $registry = Mockery::mock(JsonSchemaRegistry::class); + $registry->shouldReceive('register')->once()->andReturn(7); + + $bodySchema = new KafkaJsonSchema('topic-value', '{"type":"object"}'); + + $msg = Mockery::mock(KafkaProducerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn(['name' => 'test']); + $msg->shouldReceive('withBody')->andReturnSelf(); + $msg->shouldReceive('getKey')->andReturn(null); + + $encoder = new JsonSchemaEncoder($registry, $bodySchema); + $result = $encoder->encode($msg); + + $this->assertSame($msg, $result); + } + + /** @test */ + public function testEncodeBodyUsesExistingSchemaId(): void + { + $registry = Mockery::mock(JsonSchemaRegistry::class); + $registry->shouldReceive('register')->never(); + + $bodySchema = new KafkaJsonSchema('topic-value', '{"type":"object"}'); + $bodySchema->setSchemaId(5); + + $msg = Mockery::mock(KafkaProducerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn('{"name":"test"}'); + $msg->shouldReceive('withBody')->andReturnSelf(); + $msg->shouldReceive('getKey')->andReturn(null); + + $encoder = new JsonSchemaEncoder($registry, $bodySchema); + $result = $encoder->encode($msg); + + $this->assertSame($msg, $result); + } + + /** @test */ + public function testEncodeWithKeySchema(): void + { + $registry = Mockery::mock(JsonSchemaRegistry::class); + $registry->shouldReceive('register')->twice()->andReturn(10); + + $bodySchema = new KafkaJsonSchema('topic-value', '{"type":"object"}'); + $keySchema = new KafkaJsonSchema('topic-key', '{"type":"string"}'); + + $msg = Mockery::mock(KafkaProducerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn(['id' => 1]); + $msg->shouldReceive('withBody')->andReturnSelf(); + $msg->shouldReceive('getKey')->andReturn('my-key'); + $msg->shouldReceive('withKey')->andReturnSelf(); + + $encoder = new JsonSchemaEncoder($registry, $bodySchema, $keySchema); + $result = $encoder->encode($msg); + + $this->assertSame($msg, $result); + } + + /** @test */ + public function testEncodeThrowsOnInvalidSchemaDefinition(): void + { + $this->expectException(ParameterException::class); + + $registry = Mockery::mock(JsonSchemaRegistry::class); + $bodySchema = new KafkaJsonSchema('topic-value', 'not valid json schema{{{'); + $bodySchema->setSchemaId(1); + + $msg = Mockery::mock(KafkaProducerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn(['name' => 'test']); + + $encoder = new JsonSchemaEncoder($registry, $bodySchema); + $encoder->encode($msg); + } + + /** @test */ + public function testEncodeThrowsOnInvalidPayload(): void + { + $this->expectException(\TypeError::class); + + $registry = Mockery::mock(JsonSchemaRegistry::class); + $bodySchema = new KafkaJsonSchema('topic-value', '{"type":"object"}'); + $bodySchema->setSchemaId(1); + + $msg = Mockery::mock(KafkaProducerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn(NAN); + + $encoder = new JsonSchemaEncoder($registry, $bodySchema); + $encoder->encode($msg); + } + + /** @test */ + public function testEncodeThrowsOnMissingRequiredField(): void + { + $this->expectException(ParameterException::class); + + $registry = Mockery::mock(JsonSchemaRegistry::class); + $schema = '{"type":"object","required":["name"],"properties":{"name":{"type":"string"}}}'; + $bodySchema = new KafkaJsonSchema('topic-value', $schema); + $bodySchema->setSchemaId(1); + + $msg = Mockery::mock(KafkaProducerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn(['age' => 30]); + + $encoder = new JsonSchemaEncoder($registry, $bodySchema); + $encoder->encode($msg); + } + + /** @test */ + public function testEncodeThrowsOnTypeMismatch(): void + { + $this->expectException(ParameterException::class); + + $registry = Mockery::mock(JsonSchemaRegistry::class); + $schema = '{"type":"object","properties":{"name":{"type":"string"}}}'; + $bodySchema = new KafkaJsonSchema('topic-value', $schema); + $bodySchema->setSchemaId(1); + + $msg = Mockery::mock(KafkaProducerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn(['name' => 123]); + + $encoder = new JsonSchemaEncoder($registry, $bodySchema); + $encoder->encode($msg); + } + + /** @test */ + public function testEncodePassesWithCorrectTypes(): void + { + $registry = Mockery::mock(JsonSchemaRegistry::class); + $schema = '{"type":"object","required":["name","age"],"properties":{"name":{"type":"string"},"age":{"type":"integer"},"active":{"type":"boolean"},"score":{"type":"number"},"tags":{"type":"array"},"meta":{"type":"object"},"nothing":{"type":"null"}}}'; + $bodySchema = new KafkaJsonSchema('topic-value', $schema); + $bodySchema->setSchemaId(1); + + $msg = Mockery::mock(KafkaProducerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn([ + 'name' => 'test', + 'age' => 25, + 'active' => true, + 'score' => 9.5, + 'tags' => ['a'], + 'meta' => (object)['k' => 'v'], + 'nothing' => null, + ]); + $msg->shouldReceive('withBody')->andReturnSelf(); + $msg->shouldReceive('getKey')->andReturn(null); + + $encoder = new JsonSchemaEncoder($registry, $bodySchema); + $result = $encoder->encode($msg); + + $this->assertSame($msg, $result); + } + + /** @test */ + public function testEncodeSkipsTypeValidationWhenNoTypeInSchema(): void + { + $registry = Mockery::mock(JsonSchemaRegistry::class); + $schema = '{"type":"object","properties":{"name":{}}}'; + $bodySchema = new KafkaJsonSchema('topic-value', $schema); + $bodySchema->setSchemaId(1); + + $msg = Mockery::mock(KafkaProducerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn(['name' => 123]); + $msg->shouldReceive('withBody')->andReturnSelf(); + $msg->shouldReceive('getKey')->andReturn(null); + + $encoder = new JsonSchemaEncoder($registry, $bodySchema); + $result = $encoder->encode($msg); + + $this->assertSame($msg, $result); + } + + /** @test */ + public function testEncodeWithMultiTypeAllowsNullable(): void + { + $registry = Mockery::mock(JsonSchemaRegistry::class); + $schema = '{"type":"object","properties":{"name":{"type":["string","null"]}}}'; + $bodySchema = new KafkaJsonSchema('topic-value', $schema); + $bodySchema->setSchemaId(1); + + $msg = Mockery::mock(KafkaProducerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn(['name' => null]); + $msg->shouldReceive('withBody')->andReturnSelf(); + $msg->shouldReceive('getKey')->andReturn(null); + + $encoder = new JsonSchemaEncoder($registry, $bodySchema); + $result = $encoder->encode($msg); + + $this->assertSame($msg, $result); + } + + /** @test */ + public function testEncodeWithKeySchemaAndNonStringKey(): void + { + $registry = Mockery::mock(JsonSchemaRegistry::class); + $registry->shouldReceive('register')->twice()->andReturn(10); + + $bodySchema = new KafkaJsonSchema('topic-value', '{"type":"object"}'); + $keySchema = new KafkaJsonSchema('topic-key', '{"type":"object"}'); + + $msg = Mockery::mock(KafkaProducerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn(['id' => 1]); + $msg->shouldReceive('withBody')->andReturnSelf(); + $msg->shouldReceive('getKey')->andReturn(['id' => 1]); + $msg->shouldReceive('withKey')->andReturnSelf(); + + $encoder = new JsonSchemaEncoder($registry, $bodySchema, $keySchema); + $result = $encoder->encode($msg); + + $this->assertSame($msg, $result); + } +} diff --git a/tests/Libraries/Kafka/JsonSchemaRegistryTest.php b/tests/Libraries/Kafka/JsonSchemaRegistryTest.php new file mode 100644 index 0000000..85591f4 --- /dev/null +++ b/tests/Libraries/Kafka/JsonSchemaRegistryTest.php @@ -0,0 +1,164 @@ + + * @license https://mit-license.org/ MIT License + * @version GIT: 0.0.1 + * @link https://github.com/ + */ + +declare(strict_types=1); + +namespace Tests\Libraries\Kafka; + +use GuzzleHttp\Client as GuzzleClient; +use Mockery; +use PHPUnit\Framework\TestCase; +use Psr\Http\Message\ResponseInterface; +use Psr\Http\Message\StreamInterface; +use Spotlibs\PhpLib\Exceptions\ParameterException; +use Spotlibs\PhpLib\Libraries\Kafka\JsonSchemaRegistry; + +/** + * JsonSchemaRegistryTest + * + * Unit test for JsonSchemaRegistry + * + * @category Test + * @package Tests\Libraries\Kafka + * @author Mufthi Ryanda + * @license https://mit-license.org/ MIT License + * @link https://github.com/ + * @covers \Spotlibs\PhpLib\Libraries\Kafka\JsonSchemaRegistry + * @covers \Spotlibs\PhpLib\Exceptions\ParameterException + */ +class JsonSchemaRegistryTest extends TestCase +{ + protected function tearDown(): void + { + Mockery::close(); + parent::tearDown(); + } + + /** @test */ + public function testRegisterSuccess(): void + { + $stream = Mockery::mock(StreamInterface::class); + $stream->shouldReceive('getContents')->once()->andReturn(json_encode(['id' => 10])); + + $response = Mockery::mock(ResponseInterface::class); + $response->shouldReceive('getBody')->once()->andReturn($stream); + + $client = Mockery::mock(GuzzleClient::class); + $client->shouldReceive('post')->once()->andReturn($response); + + $registry = new JsonSchemaRegistry($client); + $result = $registry->register('topic-value', '{"type":"object"}'); + + $this->assertSame(10, $result); + } + + /** @test */ + public function testRegisterThrowsWhenNoId(): void + { + $this->expectException(ParameterException::class); + + $stream = Mockery::mock(StreamInterface::class); + $stream->shouldReceive('getContents')->once()->andReturn(json_encode([])); + + $response = Mockery::mock(ResponseInterface::class); + $response->shouldReceive('getBody')->once()->andReturn($stream); + + $client = Mockery::mock(GuzzleClient::class); + $client->shouldReceive('post')->once()->andReturn($response); + + $registry = new JsonSchemaRegistry($client); + $registry->register('topic-value', '{"type":"object"}'); + } + + /** @test */ + public function testRegisterThrowsOnHttpError(): void + { + $this->expectException(ParameterException::class); + + $client = Mockery::mock(GuzzleClient::class); + $client->shouldReceive('post')->once()->andThrow(new \RuntimeException('connection failed')); + + $registry = new JsonSchemaRegistry($client); + $registry->register('topic-value', '{"type":"object"}'); + } + + /** @test */ + public function testGetSchemaByIdSuccess(): void + { + $stream = Mockery::mock(StreamInterface::class); + $stream->shouldReceive('getContents')->once()->andReturn(json_encode(['schema' => '{"type":"object"}'])); + + $response = Mockery::mock(ResponseInterface::class); + $response->shouldReceive('getBody')->once()->andReturn($stream); + + $client = Mockery::mock(GuzzleClient::class); + $client->shouldReceive('get')->once()->andReturn($response); + + $registry = new JsonSchemaRegistry($client); + $result = $registry->getSchemaById(5); + + $this->assertSame('{"type":"object"}', $result); + } + + /** @test */ + public function testGetSchemaByIdUsesCache(): void + { + // First call populates cache + $stream = Mockery::mock(StreamInterface::class); + $stream->shouldReceive('getContents')->once()->andReturn(json_encode(['id' => 5])); + + $response = Mockery::mock(ResponseInterface::class); + $response->shouldReceive('getBody')->once()->andReturn($stream); + + $client = Mockery::mock(GuzzleClient::class); + $client->shouldReceive('post')->once()->andReturn($response); + $client->shouldReceive('get')->never(); + + $registry = new JsonSchemaRegistry($client); + $registry->register('topic-value', '{"type":"object"}'); + + // Second call should use cache + $result = $registry->getSchemaById(5); + $this->assertSame('{"type":"object"}', $result); + } + + /** @test */ + public function testGetSchemaByIdThrowsWhenNoSchema(): void + { + $this->expectException(ParameterException::class); + + $stream = Mockery::mock(StreamInterface::class); + $stream->shouldReceive('getContents')->once()->andReturn(json_encode([])); + + $response = Mockery::mock(ResponseInterface::class); + $response->shouldReceive('getBody')->once()->andReturn($stream); + + $client = Mockery::mock(GuzzleClient::class); + $client->shouldReceive('get')->once()->andReturn($response); + + $registry = new JsonSchemaRegistry($client); + $registry->getSchemaById(99); + } + + /** @test */ + public function testGetSchemaByIdThrowsOnHttpError(): void + { + $this->expectException(ParameterException::class); + + $client = Mockery::mock(GuzzleClient::class); + $client->shouldReceive('get')->once()->andThrow(new \RuntimeException('timeout')); + + $registry = new JsonSchemaRegistry($client); + $registry->getSchemaById(99); + } +} diff --git a/tests/Libraries/Kafka/KafkaAutoDecoderTest.php b/tests/Libraries/Kafka/KafkaAutoDecoderTest.php new file mode 100644 index 0000000..ecfcddc --- /dev/null +++ b/tests/Libraries/Kafka/KafkaAutoDecoderTest.php @@ -0,0 +1,232 @@ + + * @license https://mit-license.org/ MIT License + * @version GIT: 0.0.1 + * @link https://github.com/ + */ + +declare(strict_types=1); + +namespace Tests\Libraries\Kafka; + +use FlixTech\AvroSerializer\Objects\RecordSerializer; +use Jobcloud\Kafka\Message\Decoder\AvroDecoder; +use Jobcloud\Kafka\Message\KafkaConsumerMessageInterface; +use Jobcloud\Kafka\Message\Registry\AvroSchemaRegistryInterface; +use Mockery; +use PHPUnit\Framework\TestCase; +use Spotlibs\PhpLib\Libraries\Kafka\JsonSchemaDecoder; +use Spotlibs\PhpLib\Libraries\Kafka\KafkaAutoDecoder; +use Spotlibs\PhpLib\Libraries\Kafka\KafkaDecodedMessage; + +/** + * KafkaAutoDecoderTest + * + * Unit test for KafkaAutoDecoder + * + * @category Test + * @package Tests\Libraries\Kafka + * @author Mufthi Ryanda + * @license https://mit-license.org/ MIT License + * @link https://github.com/ + * @covers \Spotlibs\PhpLib\Libraries\Kafka\KafkaAutoDecoder + * @covers \Spotlibs\PhpLib\Libraries\Kafka\JsonSchemaDecoder + * @covers \Spotlibs\PhpLib\Libraries\Kafka\KafkaDecodedMessage + */ +class KafkaAutoDecoderTest extends TestCase +{ + private AvroDecoder $avroDecoder; + + protected function setUp(): void + { + parent::setUp(); + $registry = Mockery::mock(AvroSchemaRegistryInterface::class); + $recordSerializer = Mockery::mock(RecordSerializer::class); + $this->avroDecoder = new AvroDecoder($registry, $recordSerializer); + } + + protected function tearDown(): void + { + Mockery::close(); + parent::tearDown(); + } + + /** @test */ + public function testDecodeReturnsOriginalWhenBodyNotString(): void + { + $msg = Mockery::mock(KafkaConsumerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn(null); + + $decoder = new KafkaAutoDecoder($this->avroDecoder); + $result = $decoder->decode($msg); + + $this->assertSame($msg, $result); + } + + /** @test */ + public function testDecodeReturnsOriginalWhenBodyTooShort(): void + { + $msg = Mockery::mock(KafkaConsumerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn('short'); + + $decoder = new KafkaAutoDecoder($this->avroDecoder); + $result = $decoder->decode($msg); + + $this->assertSame($msg, $result); + } + + /** @test */ + public function testDecodeWireFormatJsonSchemaWithDecoder(): void + { + $payload = json_encode(['key' => 'value']); + $body = "\x00" . pack('N', 1) . $payload; + + $msg = Mockery::mock(KafkaConsumerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn($body); + + $decodedMsg = Mockery::mock(KafkaConsumerMessageInterface::class); + + $jsonDecoder = Mockery::mock(JsonSchemaDecoder::class); + $jsonDecoder->shouldReceive('decode')->once()->andReturn($decodedMsg); + + $decoder = new KafkaAutoDecoder($this->avroDecoder, $jsonDecoder); + $result = $decoder->decode($msg); + + $this->assertSame($decodedMsg, $result); + } + + /** @test */ + public function testDecodeWireFormatJsonSchemaFallsBackOnDecoderException(): void + { + $payload = json_encode(['key' => 'value']); + $body = "\x00" . pack('N', 1) . $payload; + + $msg = Mockery::mock(KafkaConsumerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn($body); + + $jsonDecoder = Mockery::mock(JsonSchemaDecoder::class); + $jsonDecoder->shouldReceive('decode')->once()->andThrow(new \RuntimeException('fail')); + + $decoder = new KafkaAutoDecoder($this->avroDecoder, $jsonDecoder); + $result = $decoder->decode($msg); + + $this->assertInstanceOf(KafkaDecodedMessage::class, $result); + $this->assertSame(['key' => 'value'], $result->getBody()); + } + + /** @test */ + public function testDecodeWireFormatJsonSchemaWithoutDecoder(): void + { + $payload = json_encode(['key' => 'value']); + $body = "\x00" . pack('N', 1) . $payload; + + $msg = Mockery::mock(KafkaConsumerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn($body); + + $decoder = new KafkaAutoDecoder($this->avroDecoder); + $result = $decoder->decode($msg); + + $this->assertInstanceOf(KafkaDecodedMessage::class, $result); + $this->assertSame(['key' => 'value'], $result->getBody()); + } + + /** @test */ + public function testDecodeWireFormatFallsBackToAvro(): void + { + // Binary payload that is NOT valid JSON after header + $body = "\x00" . pack('N', 1) . "\x02\x06foo"; + + $msg = Mockery::mock(KafkaConsumerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn($body); + $msg->shouldReceive('getTopicName')->andReturn('test-topic'); + $msg->shouldReceive('getPartition')->andReturn(0); + $msg->shouldReceive('getOffset')->andReturn(0); + $msg->shouldReceive('getTimestamp')->andReturn(0); + $msg->shouldReceive('getKey')->andReturn(null); + $msg->shouldReceive('getHeaders')->andReturn([]); + + $registry = Mockery::mock(AvroSchemaRegistryInterface::class); + $registry->shouldReceive('hasBodySchemaForTopic')->andReturn(false); + $registry->shouldReceive('hasKeySchemaForTopic')->andReturn(false); + + $recordSerializer = Mockery::mock(RecordSerializer::class); + $avroDecoder = new AvroDecoder($registry, $recordSerializer); + + $decoder = new KafkaAutoDecoder($avroDecoder); + $result = $decoder->decode($msg); + + // AvroDecoder returns a new KafkaConsumerMessage (not the same instance) + $this->assertInstanceOf(KafkaConsumerMessageInterface::class, $result); + } + + /** @test */ + public function testDecodeWireFormatReturnsOriginalWhenAvroFails(): void + { + $body = "\x00" . pack('N', 1) . "\x02\x06foo"; + + $msg = Mockery::mock(KafkaConsumerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn($body); + $msg->shouldReceive('getTopicName')->andReturn('test-topic'); + + $registry = Mockery::mock(AvroSchemaRegistryInterface::class); + $registry->shouldReceive('hasBodySchemaForTopic')->andThrow(new \RuntimeException('avro fail')); + + $recordSerializer = Mockery::mock(RecordSerializer::class); + $avroDecoder = new AvroDecoder($registry, $recordSerializer); + + $decoder = new KafkaAutoDecoder($avroDecoder); + $result = $decoder->decode($msg); + + $this->assertSame($msg, $result); + } + + /** @test */ + public function testDecodePlainJson(): void + { + $body = json_encode(['hello' => 'world']); + + $msg = Mockery::mock(KafkaConsumerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn($body); + + $decoder = new KafkaAutoDecoder($this->avroDecoder); + $result = $decoder->decode($msg); + + $this->assertInstanceOf(KafkaDecodedMessage::class, $result); + $this->assertSame(['hello' => 'world'], $result->getBody()); + } + + /** @test */ + public function testDecodePlainJsonArray(): void + { + $body = json_encode([1, 2, 3]); + + $msg = Mockery::mock(KafkaConsumerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn($body); + + $decoder = new KafkaAutoDecoder($this->avroDecoder); + $result = $decoder->decode($msg); + + $this->assertInstanceOf(KafkaDecodedMessage::class, $result); + $this->assertSame([1, 2, 3], $result->getBody()); + } + + /** @test */ + public function testDecodeReturnsOriginalForNonJsonPlainText(): void + { + $body = 'this is just plain text that is longer than 5 chars'; + + $msg = Mockery::mock(KafkaConsumerMessageInterface::class); + $msg->shouldReceive('getBody')->andReturn($body); + + $decoder = new KafkaAutoDecoder($this->avroDecoder); + $result = $decoder->decode($msg); + + $this->assertSame($msg, $result); + } +} diff --git a/tests/Libraries/Kafka/KafkaDecodedMessageTest.php b/tests/Libraries/Kafka/KafkaDecodedMessageTest.php new file mode 100644 index 0000000..62d3248 --- /dev/null +++ b/tests/Libraries/Kafka/KafkaDecodedMessageTest.php @@ -0,0 +1,112 @@ + + * @license https://mit-license.org/ MIT License + * @version GIT: 0.0.1 + * @link https://github.com/ + */ + +declare(strict_types=1); + +namespace Tests\Libraries\Kafka; + +use Jobcloud\Kafka\Message\KafkaConsumerMessageInterface; +use Mockery; +use PHPUnit\Framework\TestCase; +use Spotlibs\PhpLib\Libraries\Kafka\KafkaDecodedMessage; + +/** + * KafkaDecodedMessageTest + * + * Unit test for KafkaDecodedMessage + * + * @category Test + * @package Tests\Libraries\Kafka + * @author Mufthi Ryanda + * @license https://mit-license.org/ MIT License + * @link https://github.com/ + * @covers \Spotlibs\PhpLib\Libraries\Kafka\KafkaDecodedMessage + */ +class KafkaDecodedMessageTest extends TestCase +{ + protected function tearDown(): void + { + Mockery::close(); + parent::tearDown(); + } + + /** @test */ + public function testGetBodyReturnsDecodedBody(): void + { + $original = Mockery::mock(KafkaConsumerMessageInterface::class); + $decoded = ['key' => 'value']; + + $msg = new KafkaDecodedMessage($original, $decoded); + $this->assertSame($decoded, $msg->getBody()); + } + + /** @test */ + public function testGetKeyDelegatesToOriginal(): void + { + $original = Mockery::mock(KafkaConsumerMessageInterface::class); + $original->shouldReceive('getKey')->once()->andReturn('my-key'); + + $msg = new KafkaDecodedMessage($original, []); + $this->assertSame('my-key', $msg->getKey()); + } + + /** @test */ + public function testGetTopicNameDelegatesToOriginal(): void + { + $original = Mockery::mock(KafkaConsumerMessageInterface::class); + $original->shouldReceive('getTopicName')->once()->andReturn('my-topic'); + + $msg = new KafkaDecodedMessage($original, []); + $this->assertSame('my-topic', $msg->getTopicName()); + } + + /** @test */ + public function testGetPartitionDelegatesToOriginal(): void + { + $original = Mockery::mock(KafkaConsumerMessageInterface::class); + $original->shouldReceive('getPartition')->once()->andReturn(3); + + $msg = new KafkaDecodedMessage($original, []); + $this->assertSame(3, $msg->getPartition()); + } + + /** @test */ + public function testGetHeadersDelegatesToOriginal(): void + { + $original = Mockery::mock(KafkaConsumerMessageInterface::class); + $original->shouldReceive('getHeaders')->once()->andReturn(['h' => 'v']); + + $msg = new KafkaDecodedMessage($original, []); + $this->assertSame(['h' => 'v'], $msg->getHeaders()); + } + + /** @test */ + public function testGetOffsetDelegatesToOriginal(): void + { + $original = Mockery::mock(KafkaConsumerMessageInterface::class); + $original->shouldReceive('getOffset')->once()->andReturn(100); + + $msg = new KafkaDecodedMessage($original, []); + $this->assertSame(100, $msg->getOffset()); + } + + /** @test */ + public function testGetTimestampDelegatesToOriginal(): void + { + $original = Mockery::mock(KafkaConsumerMessageInterface::class); + $original->shouldReceive('getTimestamp')->once()->andReturn(1234567890); + + $msg = new KafkaDecodedMessage($original, []); + $this->assertSame(1234567890, $msg->getTimestamp()); + } +} diff --git a/tests/Libraries/Kafka/KafkaJsonSchemaTest.php b/tests/Libraries/Kafka/KafkaJsonSchemaTest.php new file mode 100644 index 0000000..5c08dac --- /dev/null +++ b/tests/Libraries/Kafka/KafkaJsonSchemaTest.php @@ -0,0 +1,63 @@ + + * @license https://mit-license.org/ MIT License + * @version GIT: 0.0.1 + * @link https://github.com/ + */ + +declare(strict_types=1); + +namespace Tests\Libraries\Kafka; + +use PHPUnit\Framework\TestCase; +use Spotlibs\PhpLib\Libraries\Kafka\KafkaJsonSchema; + +/** + * KafkaJsonSchemaTest + * + * Unit test for KafkaJsonSchema + * + * @category Test + * @package Tests\Libraries\Kafka + * @author Mufthi Ryanda + * @license https://mit-license.org/ MIT License + * @link https://github.com/ + * @covers \Spotlibs\PhpLib\Libraries\Kafka\KafkaJsonSchema + */ +class KafkaJsonSchemaTest extends TestCase +{ + /** @test */ + public function testGetSubject(): void + { + $schema = new KafkaJsonSchema('topic-value', '{"type":"object"}'); + $this->assertSame('topic-value', $schema->getSubject()); + } + + /** @test */ + public function testGetDefinition(): void + { + $schema = new KafkaJsonSchema('topic-value', '{"type":"object"}'); + $this->assertSame('{"type":"object"}', $schema->getDefinition()); + } + + /** @test */ + public function testGetSchemaIdReturnsNullByDefault(): void + { + $schema = new KafkaJsonSchema('topic-value', '{"type":"object"}'); + $this->assertNull($schema->getSchemaId()); + } + + /** @test */ + public function testSetAndGetSchemaId(): void + { + $schema = new KafkaJsonSchema('topic-value', '{"type":"object"}'); + $schema->setSchemaId(42); + $this->assertSame(42, $schema->getSchemaId()); + } +} From bc498058dd7af2f607db13f48a623e9d6a1d23a2 Mon Sep 17 00:00:00 2001 From: Mufthi Ryanda <77824812+mufthiryanda@users.noreply.github.com> Date: Thu, 21 May 2026 11:16:36 +0700 Subject: [PATCH 11/11] feat : add exclude on Kafka UT --- phpunit.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/phpunit.xml b/phpunit.xml index 3c5e38b..5030974 100644 --- a/phpunit.xml +++ b/phpunit.xml @@ -31,6 +31,7 @@ src/Exceptions/Handler.php src/Libraries/Kafka.php + src/Libraries/Kafka/Kafka.php src/Libraries/KafkaCallable.php src/Libraries/Storage.php src/Commands