Real-Time Messaging with Apache Kafka | Sagar Kakkala’s World 🚀






Why Real time messaging system is required


Here if want to have Machine Learning Model, most important thing we need is to read and handle data and thats where messaging systems play a major role in learning trends


we can have variety of examples here and we will go with most common one here


lets say you have a bank account and you do transactions regularly in 1000s, 2000s,3000s and not exceeding 10,000


and all of sudden if you have made a transaction of 1 lakh, you will get an alert or call to the phone number registered for bank to alert..


and this is how change in trend in recognised,and an alert is sent.


and now lets say, situation changed where now you were transacting in lakhs instead of thousands ..for obvious reasons like you started a business or startup


Now your bank needs to update your transaction data for you to stop alerts and set a new alert like if transaction happens in crores, then send an alert


And not just banking transactions, this concept can be applied in manufacturing industries as well.. lets say, we have oil manufacturing industry and boilers have to run on certain temparature, and if temparature exists certain point, alert has to be sent before it turns into danger and action can be taken


Machine Learning like analysing pattern with existing data and alerting when data does not fit is what plays a major role in many industries growth


Here, for the above example -- we will take examples of Apache Kafka and Apache flink 


Apache kafka can process real time data and Apache flink can aggregate data to check patterns and help us understand situation, also different databases can be added to Apache flink


In this session, we will concentrate on Apache Kafka and we will discuss on Apache flink in further sessions


Let us understand Apache Kafka better with Demo and hands-on



Apache Kafka Demo


For this Demo, install Docker Desktop -- Docker Desktop

Note: Project is designed on Mac, you can follow same steps in windows with variation in commands or You can host linux machine by downloading Ubuntu desktop from Microsoft store which can make it easier

Open terminal, lets create directory and add docker compose file

mkdir kafka-demo
cd kafka-demo
vi docker-compose.yml

and paste the contents here



services:
  kafka:
    image: apache/kafka:3.7.0
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: "broker,controller"
      KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092"
      KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:9093"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
    depends_on:
      - kafka

volumes:
  kafka_data:

~             





and now we run docker compose up command


docker compose up -d




and now check docker-desktop and you should see two containers running




and according to docker image, we are running kafka on port 9092 in our local and kafka ui on port 8080



Understanding Kafka


Now lets create a messaging system to have banking transaction alert fraud, and here we will need Producer which producer messages and consumer which consumes messages and Topics that makes to and fro possible, and also Broker is like a machine

may be simply put if Kafka were a bank:

  • Producer → someone depositing money

  • Consumer → someone withdrawing money

  • Topic → the account

  • Broker → the bank branch that stores and manages the account

Lets understand this better by a Demo

Lets create a Topic, Producer and Consumer

docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
  --create \
  --topic test \
  --bootstrap-server kafka:9092 \
  --partitions 1 \
  --replication-factor 1



Now to check list of topics

docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
  --list \
  --bootstrap-server kafka:9092


Now let us create producer 

docker exec -it kafka /opt/kafka/bin/kafka-console-producer.sh \
  --broker-list kafka:9092 \
  --topic test


This actually gives you a console to input, before you being, open a new terminal window for our consumer




Now in new tab, create consumer
docker exec -it kafka /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server kafka:9092 \
  --topic test \
  --from-beginning



Now what happens here, any message from producer is received by consumer in real-time and both producer and consumer are attached by topic

You can test by typing any message in Producer terminal and check on consumer terminal

Producer terminal


Consumer Terminal


The data here is sent and received in real-time

Also, since we had kafka-UI as well, open docker desktop to access it easily


and you will be able to access Kafka Dashboard





Here you would be able to see number of messages, Topics, consumers, and Brokers here are nothing but more like instances, since we deal with large amount of Data in real time, We can add additional Brokers to deal with data in real time and that is flexibility by Kafka to grow horizontally


Banking Transaction Alert


let us understand this in real time by Banking Transactions, let us create a two topics, one for transactions and other for alerts

But Before the project, let us make small changes in docker image to have messages listen to our local host

turn down existing docker setup

docker compose down -v



Now edit docker file to have kafka listener running on localhost



and now, let us turn up containers

docker compose up -d



Now let us create two topics, one for transactions and one for fraud-alerts

docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
  --create \
  --topic transactions \
  --bootstrap-server kafka:9092 \
  --partitions 1 \
  --replication-factor 1




docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
  --create \
  --topic fraud-alerts \
  --bootstrap-server kafka:9092 \
  --partitions 1 \
  --replication-factor 1


now install kafka-python and also make sure python is installed in your local


pip install kafka-python



and now in local create a file producer.py


prouducer.py

import json
import time
import random
from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers="localhost:9092",
    value_serializer=lambda v: json.dumps(v).encode("utf-8")
)

customers = ["C001", "C002", "C003", "C004"]

while True:
    amount = random.choice([random.randint(100, 200), random.randint(2000, 5000)])  # normal OR fraud
    
    data = {
        "customer_id": random.choice(customers),
        "transaction_amount": amount,
        "timestamp": time.time()
    }

    print("Sending:", data)
    producer.send("transactions", value=data)

    time.sleep(1)





and before running producer.py file, let us also create consumer.py file, open a new terminal window
consumer.py
import json
from kafka import KafkaConsumer, KafkaProducer
# consumer
consumer = KafkaConsumer(
    "transactions",
    bootstrap_servers="localhost:9092",
    auto_offset_reset="earliest",
    value_deserializer=lambda v: json.loads(v.decode("utf-8"))
)

# producer for alerts
alert_producer = KafkaProducer(
    bootstrap_servers="localhost:9092",
    value_serializer=lambda v: json.dumps(v).encode("utf-8")
)

print("Fraud detector started...")

for msg in consumer:
    transaction = msg.value
    amount = transaction["transaction_amount"]

    # fraud rule
    if amount > 2000:
        alert = {
            "customer_id": transaction["customer_id"],
            "transaction_amount": amount,
            "timestamp": transaction["timestamp"],
            "alert": "SUSPICIOUS_TRANSACTION"
        }

        print("⚠️ FRAUD DETECTED:", alert)
        alert_producer.send("fraud_alerts", value=alert)
    else:
        print("OK:", transaction)




And now before we run files let us understand what each does, Prodcer.py file has 4 customer data C001,C002,C003,C004 and when randomly transactions are generated for each customer, few in 100s and few in 1000s

Now consumer.py file receives this message from producer , if amount is more than 2000, it sends to fraud_alerts topic. In general use case this data pattern aggregation and comparsion can be done by Machine Learning Models which we will learn in further sessions 


Now let us run producer.py and consumer.py files


python3 producer.py



Now run consumer.py in different tab

python3 consumer.py



As you could see, it gave us data of suspicious transaction and also consumer waits for message from producer, If producer is interrupted, consumer just keeps waiting and if producer started producting messages again , it shows messages in real time

We will use the real time messaging concept in further session for Machine Learning Model

This concludes blog here

Comments