Why Real time messaging system is required
Here if want to have Machine Learning Model, most important thing we need is to read and handle data and thats where messaging systems play a major role in learning trends
we can have variety of examples here and we will go with most common one here
lets say you have a bank account and you do transactions regularly in 1000s, 2000s,3000s and not exceeding 10,000
and all of sudden if you have made a transaction of 1 lakh, you will get an alert or call to the phone number registered for bank to alert..
and this is how change in trend in recognised,and an alert is sent.
and now lets say, situation changed where now you were transacting in lakhs instead of thousands ..for obvious reasons like you started a business or startup
Now your bank needs to update your transaction data for you to stop alerts and set a new alert like if transaction happens in crores, then send an alert
And not just banking transactions, this concept can be applied in manufacturing industries as well.. lets say, we have oil manufacturing industry and boilers have to run on certain temparature, and if temparature exists certain point, alert has to be sent before it turns into danger and action can be taken
Machine Learning like analysing pattern with existing data and alerting when data does not fit is what plays a major role in many industries growth
Here, for the above example -- we will take examples of Apache Kafka and Apache flink
Apache kafka can process real time data and Apache flink can aggregate data to check patterns and help us understand situation, also different databases can be added to Apache flink
In this session, we will concentrate on Apache Kafka and we will discuss on Apache flink in further sessions
Let us understand Apache Kafka better with Demo and hands-on
Apache Kafka Demo
mkdir kafka-demo
cd kafka-demo
vi docker-compose.yml
and paste the contents here
services:
kafka:
image: apache/kafka:3.7.0
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:9093"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
depends_on:
- kafka
volumes:
kafka_data:
~
and now we run docker compose up command
docker compose up -d
and now check docker-desktop and you should see two containers running
and according to docker image, we are running kafka on port 9092 in our local and kafka ui on port 8080
Understanding Kafka
may be simply put if Kafka were a bank:
-
Producer → someone depositing money
-
Consumer → someone withdrawing money
-
Topic → the account
-
Broker → the bank branch that stores and manages the account
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
--create \
--topic test \
--bootstrap-server kafka:9092 \
--partitions 1 \
--replication-factor 1
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server kafka:9092
docker exec -it kafka /opt/kafka/bin/kafka-console-producer.sh \
--broker-list kafka:9092 \
--topic test
docker exec -it kafka /opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--topic test \
--from-beginning
Banking Transaction Alert
docker compose down -v
docker compose up -d
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
--create \
--topic transactions \
--bootstrap-server kafka:9092 \
--partitions 1 \
--replication-factor 1
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
--create \
--topic fraud-alerts \
--bootstrap-server kafka:9092 \
--partitions 1 \
--replication-factor 1
import json
import time
import random
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers="localhost:9092",
value_serializer=lambda v: json.dumps(v).encode("utf-8")
)
customers = ["C001", "C002", "C003", "C004"]
while True:
amount = random.choice([random.randint(100, 200), random.randint(2000, 5000)]) # normal OR fraud
data = {
"customer_id": random.choice(customers),
"transaction_amount": amount,
"timestamp": time.time()
}
print("Sending:", data)
producer.send("transactions", value=data)
time.sleep(1)
import json
from kafka import KafkaConsumer, KafkaProducer
# consumer
consumer = KafkaConsumer(
"transactions",
bootstrap_servers="localhost:9092",
auto_offset_reset="earliest",
value_deserializer=lambda v: json.loads(v.decode("utf-8"))
)
# producer for alerts
alert_producer = KafkaProducer(
bootstrap_servers="localhost:9092",
value_serializer=lambda v: json.dumps(v).encode("utf-8")
)
print("Fraud detector started...")
for msg in consumer:
transaction = msg.value
amount = transaction["transaction_amount"]
# fraud rule
if amount > 2000:
alert = {
"customer_id": transaction["customer_id"],
"transaction_amount": amount,
"timestamp": transaction["timestamp"],
"alert": "SUSPICIOUS_TRANSACTION"
}
print("⚠️ FRAUD DETECTED:", alert)
alert_producer.send("fraud_alerts", value=alert)
else:
print("OK:", transaction)
python3 producer.py
python3 consumer.py
























Comments
Post a Comment