From f279d3ad390cdb501cf02c659ed8d4c4c1ad3516 Mon Sep 17 00:00:00 2001 From: joshuajnoble Date: Thu, 11 Jun 2026 14:19:51 -0700 Subject: [PATCH 01/10] removing images Signed-off-by: joshuajnoble --- .../confluent_modernization_notebook.ipynb | 782 ++++++++++++++++++ .../dashboard_two_stream.py | 233 ++++++ .../confluent-modernization/requirements.txt | 51 ++ 3 files changed, 1066 insertions(+) create mode 100644 tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb create mode 100644 tutorials/17-data-streaming/confluent-modernization/dashboard_two_stream.py create mode 100644 tutorials/17-data-streaming/confluent-modernization/requirements.txt diff --git a/tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb b/tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb new file mode 100644 index 000000000..6867ac897 --- /dev/null +++ b/tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb @@ -0,0 +1,782 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "215514af", + "metadata": {}, + "source": [ + "Apache Kafka is a powerful distributed event streaming system that provides real-time data. Event-driven architectures are the most commonly used strategy for streaming data: continuous generation and transmission of data from sources like sensors, websites, and financial transactions However enterprises often need additional data governance, data security, data integration, observability, and operational data management. Confluent packages all of these capabilities into a data platform that reduces the complexity of deploying and operating Kafka at scale. Running Kafka clusters in production requires managing brokers, storage, networking, scaling, upgrades, monitoring, and disaster recovery. Confluent offers managed cloud services and operational tooling that reduce the burden on infrastructure teams and allow developers to focus on building applications rather than maintaining clusters.\n", + "\n", + "Rather than treating Kafka as a messaging system, Confluent promotes Kafka as a central nervous system for enterprise data. Events can be shared across applications, advanced analytics systems, artificial intelligence and machine learning platforms, and operational systems in real time. This enables data architectures that are more responsive and less dependent on either batch processing or batch data ingestion and that speed data-driven decision-making and business intelligence.\n", + "\n", + "To see how this story plays out, imagine the use cases of a large health insurance provider, “Wellspring Health,” which already runs Apache Kafka on-premises to process healthcare and insurance events.\n", + "\n", + "Their legacy system contains a Kafka environment which handles:\n", + "\n", + "* Medical claims\n", + "* Pharmacy transactions\n", + "* Fraud detection\n", + "* Provider billing events\n", + "* Member eligibility updates\n", + "\n", + "The system works but data teams struggle with inconsistent schemas, unknown downstream consumers, custom ETL pipelines, and slow data analytics onboarding. To improve operational efficiency, Wellspring looks to adopt the cloud-based Confluent Cloud in a data modernization initiative to upgrade their streaming platform while still preserving their existing Kafka investments and knowledge.\n", + "\n", + "The highest priorities on the roadmap for Wellspring are to:\n", + "\n", + "1. Detect potentially fraudulent claims in real time\n", + "2. Provide access controls to govern sensitive data structures\n", + "3. Easily provide Iceberg snapshots for ingestion into a data lake or data warehouse\n", + "\n", + "In this tutorial, you'll see how Wellspring can modernize and optimize their data infrastructre and Kafka architectures with Confluent to easily create derived big data streams, improve data processing, and create dashboards. This will give them a more scalable and unified data infrastructure and analytics ecosystem.\n", + "\n", + "The first step along this path is to take the Kafka streams that they're already using in an on-prem environment and move them to Confluent Cloud. \n", + "\n", + "You can find the code sections of this tutorial in our [Github repository](https://github.com/IBM/ibmdotcom-tutorials) in two different files: confluent_modernization_notebook.ipynb and dashboard_two_stream.py \n", + "\n", + "# Step 1\n", + "\n", + "Create an account in Confluent Cloud. You can use Github, Google, or simply use your own email address. Answer the initial questions however you'd like. When you reach the stage to create your own cluster, select \"Explore other cluster types and pricing\".\n", + "\n", + "![](./images/sign_up.png)\n", + "\n", + "Create the environment. For the purposes of this demo, call it \"default\".\n", + "\n", + "![](./images/0_a_create_account.png)\n", + "\n", + "Next, create the cluster. For the purposes of this, this one is named \"tutorial_cluster\" but you can choose any name that you'd like. Use the Basic cluster for this tutorial since it has pricing compatible with a simple proof of concept.\n", + "\n", + "![](./images/0_b_create_cluster.png)\n", + "\n", + "Now you're ready to create your first Kafka topic. A Kafka topic is the fundamental unit of organization in Apache Kafka. You can think of it as a feed name or logical channel where data records are published and stored.\n", + "\n", + "# Step 2\n", + "\n", + "After launching your cluster you can now create the first topic, called `medical_claims` to store incoming medical claims.\n", + "\n", + "![](./images/1_create_topic.png)\n", + "\n", + "Edit the data contract:\n", + "\n", + "![](./images/2_topic_data_contract.png)\n", + "\n", + "```\n", + "{\n", + " \"connect.name\": \"com.wellspring.claims.MedicalClaim\",\n", + " \"connect.parameters\": {\n", + " \"io.confluent.connect.avro.field.doc.event_time\": \"The string is a unicode character sequence.\",\n", + " \"io.confluent.connect.avro.record.doc\": \"Schema for a medical procedure claim.\"\n", + " },\n", + " \"doc\": \"Schema for a medical procedure claim.\",\n", + " \"fields\": [\n", + " {\n", + " \"name\": \"member_id\",\n", + " \"type\": \"int\"\n", + " },\n", + " {\n", + " \"name\": \"claim_id\",\n", + " \"type\": \"int\"\n", + " },\n", + " {\n", + " \"name\": \"provider_id\",\n", + " \"type\": \"int\"\n", + " },\n", + " {\n", + " \"name\": \"procedure_code\",\n", + " \"type\": \"string\"\n", + " },\n", + " {\n", + " \"name\": \"diagnosis_code\",\n", + " \"type\": \"string\"\n", + " },\n", + " {\n", + " \"name\": \"claim_amount\",\n", + " \"type\": \"float\"\n", + " },\n", + " {\n", + " \"name\": \"claim_status\",\n", + " \"type\": \"int\"\n", + " },\n", + " {\n", + " \"doc\": \"The string is a unicode character sequence.\",\n", + " \"name\": \"event_time\",\n", + " \"type\": \"string\"\n", + " }\n", + " ],\n", + " \"name\": \"MedicalClaim\",\n", + " \"namespace\": \"com.wellspring.claims\",\n", + " \"type\": \"record\"\n", + "}\n", + "```\n", + "\n", + "Now you'll add the Data Topic to Schema Registry. This provides a single point of Governance via the Schema Registry so that all clients querying topics know what to expect.\n", + "\n", + "Navigate to the Schema Registry by navigating to your cluster:\n", + "\n", + "![](./images/3_d_schema_registry_location.png)\n", + "\n", + "Then select Data Contracts and then Add Data Contract:\n", + "\n", + "![](./images/3_a_schema_registry.png)\n", + "\n", + "Next, enter the data contract for `medical_claims`:\n", + "\n", + "![](./images/3_b_schema_registry.png)\n", + "\n", + "Now select the API endpoint and note the URL. This is how you can access the schema registry from any client.\n", + "\n", + "To see how this works in action, create a user account to access the schema registry remotely.\n", + "\n", + "Select your username from the upper right menu and then navigate the API Keys view view, then then select Add API Key. Enter whatever name you'd like and select `Schema Registry` for the key scope and `default` for the environment.\n", + "\n", + "![](./images/4_add_key.png)\n", + "\n", + "This generates a new key. Download the key, open the downloaded text file, and copy the values into your .env file. Save the API key as `REGISTRY_API_USER`, the API Secret as `REGISTRY_API_SECRET`, and the registry URL as `REGISTRY_URL`. Then save and close your .env file.\n", + "\n", + "You can now test the Schema Registry like so. You'll want to create a Python environment and install the libraries found in the `requirements.txt` file in the project repository.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e79c94b1", + "metadata": {}, + "outputs": [], + "source": [ + "%pip install pyiceberg requests confluent_kafka dotenv" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "deb120bc", + "metadata": {}, + "outputs": [], + "source": [ + "import datetime\n", + "import os\n", + "from random import randint\n", + "from time import sleep\n", + "\n", + "import requests\n", + "from confluent_kafka import Producer, SerializingProducer\n", + "from confluent_kafka.schema_registry import SchemaRegistryClient\n", + "from confluent_kafka.schema_registry.avro import AvroSerializer\n", + "from confluent_kafka.serialization import MessageField, SerializationContext, StringSerializer\n", + "from dotenv import load_dotenv\n", + "from requests.auth import HTTPBasicAuth" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4910f2aa", + "metadata": {}, + "outputs": [], + "source": [ + "# Load env variables\n", + "load_dotenv()" + ] + }, + { + "cell_type": "markdown", + "id": "7aaa286c", + "metadata": {}, + "source": [ + "To test that your registry is working correctly, run the following:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bc836b55", + "metadata": {}, + "outputs": [], + "source": [ + "# Setup Registry and Serializer\n", + "sr_conf = {\n", + " \"url\": str(os.getenv(\"REGISTRY_URL\")),\n", + " \"basic.auth.user.info\": f'{str(os.getenv(\"REGISTRY_API_USER\"))}:{str(os.getenv(\"REGISTRY_API_SECRET\"))}',\n", + "}\n", + "schema_registry_client = SchemaRegistryClient(sr_conf)\n", + "\n", + "schema_str = schema_registry_client.get_version(\n", + " \"medical_claims-value\", schema_registry_client.get_versions(\"medical_claims-value\")[-1]\n", + ").schema.schema_str\n", + "\n", + "print(schema_str)\n", + "\n", + "schema = json.loads(schema_str) # type: ignore\n", + "\n", + "print(\" ============ Schema Fields ============\")\n", + "for f in schema[\"fields\"]:\n", + " print(f[\"name\"])" + ] + }, + { + "cell_type": "markdown", + "id": "04c8ceda", + "metadata": {}, + "source": [ + "Now you can see how any client access the streams will be able to see the data contract for that stream and any changes that may have occured in it before reading or writing to it.\n", + "\n", + "# Step 3\n", + "\n", + "Now you can create derived topics in Confluent Cloud through an automation that takes the form of a ksqlDB statement. These can check data quality, aggregate or process event streams, send notifications, and many other data workflows. In Confluent architectures, topics are generated through:\n", + "\n", + "* stream processing\n", + "* joins\n", + "* aggregations\n", + "* enrichment pipelines\n", + "\n", + "Often using:\n", + "\n", + "* ksqlDB\n", + "* Kafka Streams\n", + "* Apache Flink\n", + "\n", + "You'll see how Confleunts ksqlDB can help Wellspring create two derived topics to streamline data access. The first is to track high value claims of more than $10,000.\n", + "\n", + "You'll need a globally scoped key for the ksqlDB operations. Select your username from the upper right menu and then navigate to the API Keys view, then then select Add API Key. Enter whatever name you'd like and select `Global` for the key scope.\n", + "\n", + " This generates a new key you can use across Confluent Cloud. Download the key and open that text file and copy values into your .env file. Save the `API key` from the text file as `REQUESTS_API_KEY` in your .env and the `API Secret` from the text file as `REQUESTS_API_SECRET` in your text file.\n", + "\n", + " You'll also copy your User ID from your User Settings. \n", + "\n", + " ![](images/4_global_key.png)\n", + "\n", + "Save this to your .env file as `SERVICE_ACCOUNT_ID`.\n", + "\n", + "Open the Environment details in the Environment tab:\n", + "\n", + "![](images/9_environment_id.png)\n", + "\n", + "Copy the env ID and save it to your .env file as `ENV_ID`.\n", + "\n", + "Open the Cluster details in the Cluster tab:\n", + "\n", + "![](images/11_cluster_ID.png)\n", + "\n", + "Save this to your .env file as `CLUSTER_ID`.\n", + "\n", + "First you'll create the ksql database using the REST URL from your environment:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b58e4dda", + "metadata": {}, + "outputs": [], + "source": [ + "def create_db():\n", + " payload = {\n", + " \"spec\": {\n", + " \"display_name\": \"medical-claims-ksqldb\",\n", + " \"csu\": 4,\n", + " \"environment\": {\n", + " \"id\": os.getenv(\"ENV_ID\") # the ID of your environment\n", + " },\n", + " \"kafka_cluster\": {\n", + " \"id\": os.getenv(\"CLUSTER_ID\") # the ID of your cluster\n", + " },\n", + " \"credential_identity\": {\n", + " \"id\": os.getenv(\n", + " \"SERVICE_ACCOUNT_ID\"\n", + " ) # the ID of your user profile in Confluent Cloud\n", + " },\n", + " }\n", + " }\n", + "\n", + " response = requests.post(\n", + " \"https://api.confluent.cloud/ksqldbcm/v2/clusters\",\n", + " json=payload,\n", + " auth=HTTPBasicAuth(\n", + " str(os.getenv(\"REQUESTS_API_KEY\")), str(os.getenv(\"REQUESTS_API_SECRET\"))\n", + " ),\n", + " )\n", + "\n", + " print(response.status_code)\n", + " print(response.json())\n", + "\n", + "\n", + "create_db()" + ] + }, + { + "cell_type": "markdown", + "id": "c8b310eb", + "metadata": {}, + "source": [ + "Now you've created a ksql database. This is a purpose-built event streaming database that lets you process and analyze data in Apache Kafka using standard, lightweight SQL syntax. To use your new ksqlDB, go to the Confluent Cloud console and open the ksqlDB menu item. Note that this may take a few minutes to provision and show up in the Confluent Cloud UI.\n", + "\n", + "Go to settings and copy the Endpoint URL.\n", + "\n", + "![](./images/5_ksql_rest_url.png)\n", + "\n", + "Save this to your `.env` file as `KSQL_ENDPOINT`.\n", + "\n", + "Now you can create a base stream to work with ksqlDB:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0e92172a", + "metadata": {}, + "outputs": [], + "source": [ + "def create_base_stream():\n", + " sql = \"\"\"\n", + " CREATE STREAM medical_claims_stream (\n", + " claim_id VARCHAR,\n", + " member_id VARCHAR,\n", + " provider_id VARCHAR,\n", + " procedure_code VARCHAR,\n", + " diagnosis_code VARCHAR,\n", + " claim_amount DOUBLE,\n", + " claim_status VARCHAR,\n", + " event_time VARCHAR\n", + " )\n", + " WITH (\n", + " KAFKA_TOPIC='medical_claims',\n", + " VALUE_FORMAT='AVRO'\n", + " );\n", + " \"\"\"\n", + "\n", + " response = requests.post(\n", + " str(os.getenv(\"KSQL_ENDPOINT\")),\n", + " auth=HTTPBasicAuth(\n", + " str(os.getenv(\"REQUESTS_API_KEY\")), str(os.getenv(\"REQUESTS_API_SECRET\"))\n", + " ),\n", + " headers={\"Content-Type\": \"application/vnd.ksql.v1+json; charset=utf-8\"},\n", + " json={\"ksql\": sql},\n", + " )\n", + "\n", + " print(response.status_code)\n", + " if response.status_code != 404:\n", + " print(response.json())\n", + "\n", + "\n", + "create_base_stream()" + ] + }, + { + "cell_type": "markdown", + "id": "086d93da", + "metadata": {}, + "source": [ + "Now with that base stream, you can use SQL statements to filter all of the data coming from the base stream into a derived stream for real-time analytics:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1c52afcc", + "metadata": {}, + "outputs": [], + "source": [ + "def create_derived_stream():\n", + " sql = \"\"\"\n", + " CREATE STREAM high_value_claims AS\n", + " SELECT\n", + " claim_id,\n", + " member_id,\n", + " provider_id,\n", + " claim_amount,\n", + " procedure_code\n", + " FROM medical_claims_stream\n", + " WHERE claim_amount > 10000\n", + " EMIT CHANGES;\n", + " \"\"\"\n", + "\n", + " response = requests.post(\n", + " str(os.getenv(\"KSQL_ENDPOINT\")),\n", + " auth=HTTPBasicAuth(\n", + " str(os.getenv(\"REQUESTS_API_KEY\")), str(os.getenv(\"REQUESTS_API_SECRET\"))\n", + " ),\n", + " headers={\"Content-Type\": \"application/vnd.ksql.v1+json; charset=utf-8\"},\n", + " json={\"ksql\": sql},\n", + " )\n", + "\n", + " print(response.status_code)\n", + " if response.status_code != 404:\n", + " print(response.json())\n", + "\n", + "\n", + "create_derived_stream()" + ] + }, + { + "cell_type": "markdown", + "id": "55d85161", + "metadata": {}, + "source": [ + "\n", + "You can also create complex windowing. For instance, there are certain kinds of procudures that should trigger an audit and multiple auditable claims in a short period of time indicate that a providers systems may have been compromised in a data breach.\n", + "\n", + "To help track this, you'll create an `auditor_stream` of claims that are over $5000 for procudures '7371', '2710', and '1831'. Then, create a new stream that captures whether there are multiple items in the auditor stream in any 10 minute period and capture that as `provider_claim_spikes`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1be7819b", + "metadata": {}, + "outputs": [], + "source": [ + "def create_auditor_stream():\n", + " sql = \"\"\"\n", + " CREATE STREAM auditor_stream AS\n", + " SELECT\n", + " claim_id,\n", + " member_id,\n", + " provider_id,\n", + " claim_amount,\n", + " procedure_code,\n", + " diagnosis_code\n", + " FROM medical_claims_stream\n", + " WHERE claim_amount > 5000\n", + " AND procedure_code IN ('7371', '2710', '1831')\n", + " EMIT CHANGES;\n", + " \"\"\"\n", + "\n", + " response = requests.post(\n", + " str(os.getenv(\"KSQL_ENDPOINT\")),\n", + " auth=HTTPBasicAuth(\n", + " str(os.getenv(\"REQUESTS_API_KEY\")), str(os.getenv(\"REQUESTS_API_SECRET\"))\n", + " ),\n", + " headers={\"Content-Type\": \"application/vnd.ksql.v1+json; charset=utf-8\"},\n", + " json={\"ksql\": sql},\n", + " )\n", + "\n", + " print(response.status_code)\n", + " if response.status_code != 404:\n", + " print(response.json())\n", + "\n", + "\n", + "create_auditor_stream()" + ] + }, + { + "cell_type": "markdown", + "id": "0ca00cde", + "metadata": {}, + "source": [ + "We can now create a provider claim spike event stream that will capture when multiple claims for a single provider are submitted in a short window:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8a58a904", + "metadata": {}, + "outputs": [], + "source": [ + "def create_provider_claim_spike():\n", + " sql = \"\"\"\n", + " CREATE TABLE provider_claim_spikes AS\n", + " SELECT provider_id,\n", + " COUNT(*) AS claim_count,\n", + " SUM(claim_amount) AS total_claim_value\n", + " FROM auditor_stream\n", + " WINDOW TUMBLING (SIZE 10 MINUTES)\n", + " GROUP BY provider_id\n", + " EMIT CHANGES;\n", + " \"\"\"\n", + "\n", + " response = requests.post(\n", + " str(os.getenv(\"KSQL_ENDPOINT\")),\n", + " auth=HTTPBasicAuth(\n", + " str(os.getenv(\"REQUESTS_API_KEY\")), str(os.getenv(\"REQUESTS_API_SECRET\"))\n", + " ),\n", + " headers={\"Content-Type\": \"application/vnd.ksql.v1+json; charset=utf-8\"},\n", + " json={\"ksql\": sql},\n", + " )\n", + "\n", + " print(response.status_code)\n", + " if response.status_code != 404:\n", + " print(response.json())\n", + "\n", + "\n", + "create_provider_claim_spike()" + ] + }, + { + "cell_type": "markdown", + "id": "bd570d9f", + "metadata": {}, + "source": [ + "One important thing to note is that the creation of these derived streams will append extra characters to the beginning of the name of the derived topic. You can see the correct name in the Topic for your cluster:\n", + "\n", + "![](./images/3_d_topics_view.png)\n", + "\n", + "## Step 4 - Query derived topics\n", + "\n", + "Now that you've created the derived topics, you can query that stream. Run the following code in a new Python session so that you can see results come in as the are published to the base `medical_claims` topic.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0ffff516", + "metadata": {}, + "outputs": [], + "source": [ + "def query_high_value():\n", + " query = \"\"\"\n", + " SELECT *\n", + " FROM high_value_claims\n", + " EMIT CHANGES;\n", + " \"\"\"\n", + "\n", + " response = requests.post(\n", + " str(os.getenv(\"KSQL_ENDPOINT\")) + \"/query-stream\",\n", + " auth=HTTPBasicAuth(\n", + " str(os.getenv(\"REQUESTS_API_KEY\")), str(os.getenv(\"REQUESTS_API_SECRET\"))\n", + " ),\n", + " headers={\"Content-Type\": \"application/vnd.ksql.v1+json; charset=utf-8\"},\n", + " json={\"sql\": query},\n", + " stream=True,\n", + " )\n", + "\n", + " for line in response.iter_lines():\n", + " if line:\n", + " print(line.decode(\"utf-8\"))\n", + "\n", + "\n", + "def query_provider_spike():\n", + " query = \"\"\"\n", + " SELECT *\n", + " FROM provider_claim_spikes\n", + " EMIT CHANGES;\n", + " \"\"\"\n", + "\n", + " response = requests.post(\n", + " str(os.getenv(\"KSQL_ENDPOINT\")) + \"/query-stream\",\n", + " auth=HTTPBasicAuth(\n", + " str(os.getenv(\"REQUESTS_API_KEY\")), str(os.getenv(\"REQUESTS_API_SECRET\"))\n", + " ),\n", + " headers={\"Content-Type\": \"application/vnd.ksql.v1+json; charset=utf-8\"},\n", + " json={\"sql\": query},\n", + " stream=True,\n", + " )\n", + "\n", + " for line in response.iter_lines():\n", + " if line:\n", + " print(line.decode(\"utf-8\"))" + ] + }, + { + "cell_type": "markdown", + "id": "6d66bd05", + "metadata": {}, + "source": [ + "\n", + "Since you haven't created data would trigger a provider spike yet, nothing will show here yet. To create this data, write to the `medical_claims` stream. \n", + "\n", + "This requires a key in the cluster itself. Navigate to your cluster and then to `API Keys` and create a new key. This key will be scoped to the cluster itself and so can be used as a producer of events. Download this key and copy the `API key`, `API secret` into your `.env` as `SASL_USERNAME` and `SASL_PASSWORD`. Then copy the `Bootstrap server` details as `BOOTSTRAP_SERVERS`.\n", + "\n", + "Now, open a second terminal window and run the following python code. This code block first gets the data contract from the schema registry and uses that to ensure that the message being sent contains all of the correct fields. This data is then sent through the derived streams to demonstrate how data flows through real-time data processing systems.\n", + "\n", + "This creates 20 high value claims which will show up in the base stream and then also in the provider spike stream since it generates 10 high value claims in a short time span for 2 different providers.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d20faf32", + "metadata": {}, + "outputs": [], + "source": [ + "# Setup Registry and Serializer\n", + "sr_conf = {\n", + " \"url\": str(os.getenv(\"REGISTRY_URL\")),\n", + " \"basic.auth.user.info\": f'{str(os.getenv(\"REQUESTS_API_KEY\"))}:{str(os.getenv(\"REQUESTS_API_SECRET\"))}',\n", + "}\n", + "schema_registry_client = SchemaRegistryClient(sr_conf)\n", + "\n", + "schema_str = schema_registry_client.get_version(\n", + " \"medical_claims-value\", schema_registry_client.get_versions(\"medical_claims-value\")[-1]\n", + ").schema.schema_str\n", + "\n", + "avro_serializer = AvroSerializer(\n", + " schema_registry_client, # type: ignore\n", + " schema_str=schema_str,\n", + ") # type: ignore\n", + "\n", + "new_claim = {\n", + " \"member_id\": 1000,\n", + " \"claim_id\": 1000,\n", + " \"provider_id\": 500,\n", + " \"procedure_code\": \"ABCDEF\",\n", + " \"diagnosis_code\": \"GHIJKLM\",\n", + " \"claim_amount\": 1100.0,\n", + " \"claim_status\": 1,\n", + " \"event_time\": str(datetime.datetime.now()),\n", + "}\n", + "\n", + "producer_conf = {\n", + " \"bootstrap.servers\": os.getenv(\"BOOTSTRAP_SERVERS\"),\n", + " # Required for Confluent Cloud\n", + " \"security.protocol\": \"SASL_SSL\",\n", + " \"sasl.mechanism\": \"PLAIN\",\n", + " # Confluent Cloud API credentials\n", + " \"sasl.username\": os.getenv(\"SASL_USERNAME\"),\n", + " \"sasl.password\": os.getenv(\"SASL_PASSWORD\"),\n", + "}\n", + "\n", + "producer = SerializingProducer({**producer_conf, \"value.serializer\": avro_serializer})\n", + "\n", + "for i in range(20):\n", + " # flagged procedures = '7371', '2710', '1831'\n", + "\n", + " high_value_claim = {\n", + " \"member_id\": randint(0, 10000),\n", + " \"claim_id\": randint(0, 10000),\n", + " \"provider_id\": randint(0, 10000),\n", + " \"procedure_code\": \"ABCDEF\",\n", + " \"diagnosis_code\": \"GHIJKL\",\n", + " \"claim_amount\": 10000.0 + randint(0, 10000),\n", + " \"claim_status\": 1,\n", + " \"event_time\": str(datetime.datetime.now()),\n", + " }\n", + "\n", + " sleep(randint(0, 10))\n", + "\n", + " producer.produce(topic=\"medical_claims\", value=high_value_claim)\n", + "\n", + " producer.flush()" + ] + }, + { + "cell_type": "markdown", + "id": "c2fc0b57", + "metadata": {}, + "source": [ + "To create a provider claim spike, run the following:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "41c46557", + "metadata": {}, + "outputs": [], + "source": [ + "for i in range(20):\n", + " suspicious_claim = {\n", + " \"member_id\": 1000,\n", + " \"claim_id\": 1001,\n", + " \"provider_id\": 502, # same provider on each claim\n", + " \"procedure_code\": \"7371\", # flagged procedures = '7371', '2710', '1831'\n", + " \"diagnosis_code\": \"GHIJKLM\",\n", + " \"claim_amount\": 6000.0 + (i * 100),\n", + " \"claim_status\": 1,\n", + " \"event_time\": str(datetime.datetime.now()),\n", + " }\n", + "\n", + " producer.produce(topic=\"medical_claims\", value=suspicious_claim)\n", + "\n", + " producer.flush()" + ] + }, + { + "cell_type": "markdown", + "id": "59a78f5c", + "metadata": {}, + "source": [ + "\n", + "You'll see the terminal window with the derived topics capture the provider spike from the streamed medical claims and print that value to the terminal window.\n", + "\n", + "## Step 5 - Set up Tableflow\n", + "\n", + "Another key offering of Confluent Cloud is Tableflow, which allows you to quickly and easily create Iceberg tables that store metadata and snapshots of stored messages. Iceberg is an open source data store that creates snapshots of real-time data streaming systems. These can be ingested into analytics engines like IBMs watsonx.data or other providers like Snowflake or Amazon S3.\n", + "\n", + "Go to the topics view in Confluent Cloud and select the generated High Value flow. The generated name from the ksql operation will be something like `pksqlc-xxxxxxx-HIGH_VALUE_CLAIMS`. \n", + "\n", + "![](./images/7_enable_tableflow.png)\n", + "\n", + "Open the topic and click 'Enable Tableflow' in the upper right hand. Select 'Iceberg' and 'Use Confluent Storage'. \n", + "\n", + "![](./images/8_tableflow.png)\n", + "\n", + "This stores Iceberg snapshots of all events, along with the associated metadata, creating a widely compatible data asset for consumption by any analytics engine or storage in a data center.\n", + "\n", + "## Step 6 - Query Tableflow Iceberg\n", + "\n", + "In order to query the Iceberg datasets create, you'll need to copy the environment ID from the Environment > Details view and the Organization ID from your organizations. Save this to your .env file as `ENVIRONMENT_FOR_ICEBERG`. \n", + "\n", + "Then copy the Organization ID from your Organization tab:\n", + "\n", + "![](images/10_org_details.png)\n", + "\n", + "Save this your .env file as `ORGANIZATION_FOR_ICEBERG`.\n", + "\n", + "You can test the Iceberg table creation with the following code:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2024865a", + "metadata": {}, + "outputs": [], + "source": [ + "from pyiceberg.catalog import load_catalog\n", + "\n", + "catalog = load_catalog(\n", + " \"confluent\",\n", + " type=\"rest\",\n", + " uri=(\n", + " \"https://tableflow.us-east-2.aws.confluent.cloud/\"\n", + " \"iceberg/catalog/\"\n", + " f\"{os.getenv(\"ORGANIZATION_FOR_ICEBERG\")}\"\n", + " f\"{os.getenv(\"ENV_ID\")}\"\n", + " ),\n", + " credential=(f'{str(os.getenv(\"REQUESTS_API_KEY\"))}:{str(os.getenv(\"REQUESTS_API_SECRET\"))}'),\n", + " header={\"X-Iceberg-Access-Delegation\": \"vended-credentials\"}, # pyright: ignore[reportArgumentType]\n", + ")\n", + "\n", + "print(catalog.list_namespaces())\n", + "\n", + "ns = catalog.list_namespaces()\n", + "\n", + "ns_name, table_name = catalog.list_tables(ns[0])[0]\n", + "\n", + "table = catalog.load_table(f\"{ns_name}.{table_name}\")\n", + "\n", + "# 3. Query with filters and column selection\n", + "df = table.scan(limit=100).to_pandas()\n", + "\n", + "print(df.head())" + ] + }, + { + "cell_type": "markdown", + "id": "d9c3447b", + "metadata": {}, + "source": [ + "This will show any data created in the Iceberg table.\n", + "\n", + "## Step 7 - Dashboard\n", + "\n", + "Finally, you may want to enable Wellspring Health to view a data visualization dashboard for the high value claims and provider claim spike datasets. To do this, you'll create an app using the Streamlit framework that utilizes the data pipelines enabled by Confluent Cloud to provide a dashboard view to stakeholders.\n", + "\n", + "Open the [dashboard_two_stream.py](dashboard_two_stream.py) from the repository to see the Streamlit app. You can run this streamlit app using the following command in your Python environment:\n", + "\n", + "```\n", + "streamlit run dashboard_two_stream.py\n", + "```\n", + "\n", + "This shows the power of how data streaming platforms like Confluent Cloud can create a modern platform to centralize and simplify Kafka deployments and streamline data collection, validation and data storage. Ensuring that schemas are correctly applied and automatically generating Iceberg tables doesn't require instrumentation or external storage. This all enables high volumes of data to be processed in a low-latency, near real-time fashion that enables real-time insights from a variety of data sources." + ] + } + ], + "metadata": { + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/tutorials/17-data-streaming/confluent-modernization/dashboard_two_stream.py b/tutorials/17-data-streaming/confluent-modernization/dashboard_two_stream.py new file mode 100644 index 000000000..69068a4cf --- /dev/null +++ b/tutorials/17-data-streaming/confluent-modernization/dashboard_two_stream.py @@ -0,0 +1,233 @@ +import datetime +import json +import os +import queue +import threading +from datetime import datetime + +import pandas as pd +import requests +import streamlit as st +from confluent_kafka.schema_registry import SchemaRegistryClient +from dotenv import load_dotenv +from requests.auth import HTTPBasicAuth + +# Load env variables +load_dotenv() + +st.set_page_config(page_title="Fraudulent Claims Dashboard", layout="wide") + +st.markdown( + """ + +""", + unsafe_allow_html=True, +) + +st.title("Claims Streaming Dashboard") +st.markdown( + """This dashboard continuously queries a ksqlDB streams and displays high value medical claims in real time.""" +) + +from streamlit_autorefresh import st_autorefresh + +st_autorefresh(interval=2000, key="stream_refresh") + +# ------------------------------------------------------------------- +# Configuration +# ------------------------------------------------------------------- + +KSQL_ENDPOINT = st.sidebar.text_input("ksqlDB Endpoint", value=os.getenv("KSQL_ENDPOINT")) + +KSQL_API_KEY = st.sidebar.text_input("KSQL API Key", type="password") + +KSQL_API_SECRET = st.sidebar.text_input("KSQL API Secret", type="password") + +MAX_ROWS = st.sidebar.slider("Maximum Rows", min_value=10, max_value=500, value=100, step=10) + +START_STREAM = st.sidebar.button("Start Stream") + +### Queries +PROVIDER_SPIKE_QUERY = """ + SELECT * + FROM provider_claim_spikes + EMIT CHANGES; +""" + +HIGH_VALUE_QUERY = """ + SELECT * + FROM high_value_claims + EMIT CHANGES; +""" + +if "claims" not in st.session_state: + st.session_state.claims = [] + st.session_state.spikes = [] + st.session_state.stream_started = False + st.session_state["event_queue"] = queue.Queue() + +# get all of our schemas once +if "high_val_cols" not in st.session_state: + sr_conf = { + "url": os.getenv("REGISTRY_URL"), + "basic.auth.user.info": f'{str(os.getenv("REQUESTS_API_KEY"))}:{str(os.getenv("REQUESTS_API_SECRET"))}', + } + schema_registry_client = SchemaRegistryClient(sr_conf) + + # note that your generated stream name may be different + high_val_schema_str = schema_registry_client.get_version( + "pksqlc-4my2e5kHIGH_VALUE_CLAIMS-value", # pragma: allowlist secret + schema_registry_client.get_versions( + "pksqlc-4my2e5kHIGH_VALUE_CLAIMS-value" # pragma: allowlist secret + )[-1], + ).schema.schema_str + + print(high_val_schema_str) + + schema = json.loads(high_val_schema_str) # type: ignore + st.session_state.high_val_cols = [f["name"] for f in schema["fields"]] + + # note that your generated stream name may be different + spike_schema_str = schema_registry_client.get_version( + "pksqlc-4my2e5kPROVIDER_CLAIM_SPIKES-value", # pragma: allowlist secret + schema_registry_client.get_versions( + "pksqlc-4my2e5kPROVIDER_CLAIM_SPIKES-value" # pragma: allowlist secret + )[-1], + ).schema.schema_str + + schema = json.loads(spike_schema_str) # type: ignore + st.session_state.spike_cols = [f["name"] for f in schema["fields"]] + + print(spike_schema_str) + + +st.subheader("High Value Claims") +hv_metrics = st.container(height=500) +hv_table = st.empty() + +st.subheader("Provider Claim Spikes") +ps_metrics = st.container(height=500) +ps_table = st.empty() + +placeholder_status = st.empty() + + +def stream_query(query_name, sql, event_queue, data_columns): + print("streaming") + + response = requests.post( + str(os.getenv("KSQL_ENDPOINT")), + auth=HTTPBasicAuth(KSQL_API_KEY, KSQL_API_SECRET), + headers={"Content-Type": "application/vnd.ksql.v1+json; charset=utf-8"}, + json={"sql": sql}, + stream=True, + ) + + columns = None + + for line in response.iter_lines(): + if not line: + continue + + decoded = line.decode("utf-8") + + try: + record = json.loads(decoded) + print(record) + except Exception as e: + print(e) + continue + + # First row is columns + if columns is None and isinstance(record, dict): + columns = record["columnNames"] + continue + + values = record + + row = dict(zip(columns, values, strict=False)) # type: ignore + + row["stream"] = query_name # type: ignore + + event_queue.put(row) + + +if START_STREAM and not st.session_state.stream_started: + st.session_state.stream_started = True + + print("starting") + + try: + threading.Thread( + target=stream_query, + args=( + "spike", + PROVIDER_SPIKE_QUERY, + st.session_state.event_queue, + st.session_state.spike_cols, + ), + daemon=True, + ).start() + + threading.Thread( + target=stream_query, + args=( + "high_val_claim", + HIGH_VALUE_QUERY, + st.session_state.event_queue, + st.session_state.high_val_cols, + ), + daemon=True, + ).start() + + except requests.exceptions.RequestException as e: + placeholder_status.error(f"Connection error: {e}") + + except Exception as e: + placeholder_status.error(f"Unexpected error: {e}") + +## outside of block +while not st.session_state.event_queue.empty(): + print("event") + + event = st.session_state.event_queue.get() + + if event["stream"] == "spike": + data_rows = {k: v for k, v in event.items() if k != "stream"} + data_rows["WINDOWSTART"] = datetime.datetime.fromtimestamp( + data_rows["WINDOWSTART"] / 1000 + ).strftime("%c") + data_rows["WINDOWEND"] = datetime.datetime.fromtimestamp( + data_rows["WINDOWEND"] / 1000 + ).strftime("%c") + st.session_state.spikes.insert(0, data_rows) + + if event["stream"] == "high_val_claim": + data_rows = {k: v for k, v in event.items() if k != "stream"} + st.session_state.claims.insert(0, data_rows) + +claims_df = pd.DataFrame(st.session_state.claims, columns=st.session_state.high_val_cols) + +with hv_metrics.container(): + if len(claims_df) > 0: + col1, col2 = st.columns(2) + + col1.metric("High Value Claims", len(claims_df)) + col2.metric("Total High Claim Amount", f"${claims_df['CLAIM_AMOUNT'].sum():,.2f}") + + st.dataframe(claims_df, width="stretch", height=300) + +spikes_df = pd.DataFrame(st.session_state.spikes, columns=st.session_state.spike_cols) + +with ps_metrics.container(): + if len(spikes_df) > 0: + ps_col1, ps_col2 = st.columns(2) + + ps_col1.metric("Claim Spikes Observed", len(spikes_df)) + ps_col2.metric("Total Claim Spike Value", f"${spikes_df['TOTAL_CLAIM_VALUE'].sum():,.2f}") + + st.dataframe(spikes_df, width="stretch", height=300) diff --git a/tutorials/17-data-streaming/confluent-modernization/requirements.txt b/tutorials/17-data-streaming/confluent-modernization/requirements.txt new file mode 100644 index 000000000..f8fa9092f --- /dev/null +++ b/tutorials/17-data-streaming/confluent-modernization/requirements.txt @@ -0,0 +1,51 @@ +altair==6.1.0 +anyio==4.13.0 +attrs==26.1.0 +Authlib==1.7.2 +blinker==1.9.0 +cachetools==7.1.1 +certifi==2026.4.22 +cffi==2.0.0 +charset-normalizer==3.4.7 +click==8.3.3 +confluent-kafka==2.14.0 +cryptography==48.0.0 +fastavro==1.12.2 +gitdb==4.0.12 +GitPython==3.1.50 +h11==0.16.0 +httpcore==1.0.9 +httptools==0.7.1 +httpx==0.28.1 +idna==3.15 +itsdangerous==2.2.0 +Jinja2==3.1.6 +joserfc==1.6.5 +jsonschema==4.26.0 +jsonschema-specifications==2025.9.1 +MarkupSafe==3.0.3 +narwhals==2.21.0 +numpy==2.4.5 +packaging==26.2 +pandas==3.0.3 +pillow==12.2.0 +protobuf==7.34.1 +pyarrow==24.0.0 +pycparser==3.0 +pydeck==0.9.2 +python-dateutil==2.9.0.post0 +python-multipart==0.0.28 +referencing==0.37.0 +requests==2.34.2 +rpds-py==0.30.0 +six==1.17.0 +smmap==5.0.3 +starlette==1.0.0 +streamlit==1.57.0 +streamlit-autorefresh==1.0.1 +tenacity==9.1.4 +toml==0.10.2 +typing_extensions==4.15.0 +urllib3==2.7.0 +uvicorn==0.47.0 +websockets==16.0 From 18ebea3ab34d8da10e0047e6078f7888c811eeec Mon Sep 17 00:00:00 2001 From: joshua noble Date: Fri, 12 Jun 2026 10:21:43 -0700 Subject: [PATCH 02/10] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- .../confluent_modernization_notebook.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb b/tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb index 6867ac897..ded43b182 100644 --- a/tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb +++ b/tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb @@ -35,7 +35,7 @@ "\n", "# Step 1\n", "\n", - "Create an account in Confluent Cloud. You can use Github, Google, or simply use your own email address. Answer the initial questions however you'd like. When you reach the stage to create your own cluster, select \"Explore other cluster types and pricing\".\n", + "Create an account in Confluent Cloud. You can use GitHub, Google, or simply use your own email address. Answer the initial questions however you'd like. When you reach the stage to create your own cluster, select \"Explore other cluster types and pricing\".\n", "\n", "![](./images/sign_up.png)\n", "\n", From ce69fd7898ffa216543ced7e8b2789a90792bfdd Mon Sep 17 00:00:00 2001 From: joshua noble Date: Fri, 12 Jun 2026 10:21:53 -0700 Subject: [PATCH 03/10] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- .../confluent-modernization/dashboard_two_stream.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tutorials/17-data-streaming/confluent-modernization/dashboard_two_stream.py b/tutorials/17-data-streaming/confluent-modernization/dashboard_two_stream.py index 69068a4cf..4f488fc0b 100644 --- a/tutorials/17-data-streaming/confluent-modernization/dashboard_two_stream.py +++ b/tutorials/17-data-streaming/confluent-modernization/dashboard_two_stream.py @@ -3,7 +3,6 @@ import os import queue import threading -from datetime import datetime import pandas as pd import requests From a266832562ffa0f68a75de62bfe1f87526345d0a Mon Sep 17 00:00:00 2001 From: joshua noble Date: Fri, 12 Jun 2026 10:23:21 -0700 Subject: [PATCH 04/10] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- .../confluent_modernization_notebook.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb b/tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb index ded43b182..a88a30e49 100644 --- a/tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb +++ b/tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb @@ -27,7 +27,7 @@ "2. Provide access controls to govern sensitive data structures\n", "3. Easily provide Iceberg snapshots for ingestion into a data lake or data warehouse\n", "\n", - "In this tutorial, you'll see how Wellspring can modernize and optimize their data infrastructre and Kafka architectures with Confluent to easily create derived big data streams, improve data processing, and create dashboards. This will give them a more scalable and unified data infrastructure and analytics ecosystem.\n", + "In this tutorial, you'll see how Wellspring can modernize and optimize their data infrastructure and Kafka architectures with Confluent to easily create derived big data streams, improve data processing, and create dashboards. This will give them a more scalable and unified data infrastructure and analytics ecosystem.\n", "\n", "The first step along this path is to take the Kafka streams that they're already using in an on-prem environment and move them to Confluent Cloud. \n", "\n", From 8a03eb976be252ce2339a09fb7d096c3a2785ae9 Mon Sep 17 00:00:00 2001 From: joshua noble Date: Fri, 12 Jun 2026 10:23:40 -0700 Subject: [PATCH 05/10] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- .../confluent_modernization_notebook.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb b/tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb index a88a30e49..42e1c553c 100644 --- a/tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb +++ b/tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb @@ -31,7 +31,7 @@ "\n", "The first step along this path is to take the Kafka streams that they're already using in an on-prem environment and move them to Confluent Cloud. \n", "\n", - "You can find the code sections of this tutorial in our [Github repository](https://github.com/IBM/ibmdotcom-tutorials) in two different files: confluent_modernization_notebook.ipynb and dashboard_two_stream.py \n", + "You can find the code sections of this tutorial in our [GitHub repository](https://github.com/IBM/ibmdotcom-tutorials) in two different files: confluent_modernization_notebook.ipynb and dashboard_two_stream.py \n", "\n", "# Step 1\n", "\n", From 88e88598bdc3eb00551d4b641307e4f6730e6f6f Mon Sep 17 00:00:00 2001 From: joshua noble Date: Fri, 12 Jun 2026 10:23:55 -0700 Subject: [PATCH 06/10] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- .../confluent_modernization_notebook.ipynb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb b/tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb index 42e1c553c..4415c184c 100644 --- a/tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb +++ b/tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb @@ -412,9 +412,9 @@ "metadata": {}, "source": [ "\n", - "You can also create complex windowing. For instance, there are certain kinds of procudures that should trigger an audit and multiple auditable claims in a short period of time indicate that a providers systems may have been compromised in a data breach.\n", - "\n", - "To help track this, you'll create an `auditor_stream` of claims that are over $5000 for procudures '7371', '2710', and '1831'. Then, create a new stream that captures whether there are multiple items in the auditor stream in any 10 minute period and capture that as `provider_claim_spikes`." + "You can also create complex windowing. For instance, there are certain kinds of procedures that should trigger an audit, and multiple auditable claims in a short period of time indicate that a provider's systems may have been compromised in a data breach.\n", + "\n", + "To help track this, you'll create an `auditor_stream` of claims that are over $5000 for procedures '7371', '2710', and '1831'. Then, create a new stream that captures whether there are multiple items in the auditor stream in any 10 minute period and capture that as `provider_claim_spikes`." ] }, { From 7940864653d38b452ae6021812e648e6d0d23478 Mon Sep 17 00:00:00 2001 From: joshua noble Date: Fri, 12 Jun 2026 10:24:47 -0700 Subject: [PATCH 07/10] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- .../confluent_modernization_notebook.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb b/tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb index 4415c184c..05c2afcb4 100644 --- a/tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb +++ b/tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb @@ -142,7 +142,7 @@ "metadata": {}, "outputs": [], "source": [ - "%pip install pyiceberg requests confluent_kafka dotenv" + "%pip install pyiceberg requests confluent-kafka python-dotenv" ] }, { From 942e48cdcc0343b90147b6c49b10c60dac7f2e77 Mon Sep 17 00:00:00 2001 From: joshua noble Date: Fri, 12 Jun 2026 10:25:06 -0700 Subject: [PATCH 08/10] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- .../confluent-modernization/dashboard_two_stream.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tutorials/17-data-streaming/confluent-modernization/dashboard_two_stream.py b/tutorials/17-data-streaming/confluent-modernization/dashboard_two_stream.py index 4f488fc0b..e466bf8a9 100644 --- a/tutorials/17-data-streaming/confluent-modernization/dashboard_two_stream.py +++ b/tutorials/17-data-streaming/confluent-modernization/dashboard_two_stream.py @@ -208,6 +208,7 @@ def stream_query(query_name, sql, event_queue, data_columns): if event["stream"] == "high_val_claim": data_rows = {k: v for k, v in event.items() if k != "stream"} st.session_state.claims.insert(0, data_rows) + st.session_state.claims = st.session_state.claims[:MAX_ROWS] claims_df = pd.DataFrame(st.session_state.claims, columns=st.session_state.high_val_cols) From dc49652c54bd8148b1f25777f4a5aedc3b45f348 Mon Sep 17 00:00:00 2001 From: joshua noble Date: Fri, 12 Jun 2026 10:25:30 -0700 Subject: [PATCH 09/10] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- .../confluent_modernization_notebook.ipynb | 1 + 1 file changed, 1 insertion(+) diff --git a/tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb b/tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb index 05c2afcb4..896249b8f 100644 --- a/tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb +++ b/tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb @@ -153,6 +153,7 @@ "outputs": [], "source": [ "import datetime\n", + "import json\n", "import os\n", "from random import randint\n", "from time import sleep\n", From 9a8c94e6f75fc6ac43fd02245cb9f80b263530c1 Mon Sep 17 00:00:00 2001 From: joshua noble Date: Fri, 12 Jun 2026 10:25:46 -0700 Subject: [PATCH 10/10] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- .../confluent_modernization_notebook.ipynb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb b/tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb index 896249b8f..2c49f06ca 100644 --- a/tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb +++ b/tutorials/17-data-streaming/confluent-modernization/confluent_modernization_notebook.ipynb @@ -731,8 +731,8 @@ " uri=(\n", " \"https://tableflow.us-east-2.aws.confluent.cloud/\"\n", " \"iceberg/catalog/\"\n", - " f\"{os.getenv(\"ORGANIZATION_FOR_ICEBERG\")}\"\n", - " f\"{os.getenv(\"ENV_ID\")}\"\n", + " f\"{os.getenv('ORGANIZATION_FOR_ICEBERG')}/\"\n", + " f\"{os.getenv('ENVIRONMENT_FOR_ICEBERG')}\"\n", " ),\n", " credential=(f'{str(os.getenv(\"REQUESTS_API_KEY\"))}:{str(os.getenv(\"REQUESTS_API_SECRET\"))}'),\n", " header={\"X-Iceberg-Access-Delegation\": \"vended-credentials\"}, # pyright: ignore[reportArgumentType]\n",