[ Big data ] Apache Kafka 使用 Python 介接

本文章將展示如何使用 Python 串接 Kafka Message Broker,並傳送模擬系統 Log 資料至 Topic,如果對 Kafka 介紹、安裝或使用方式還不了解的朋友,可以參考之前寫的文章 Apache Kafka Message Broker 使用教學

Kafka 部分

Step 1.

確認目前 Kafka 服務所執行的 IP 及 Port,可透過下列 Linux 指令去查詢,正常情況下是跑在 127.0.0.1:6667 上。

netstat -tlnp  

Python 部分

第一步

  • 請依下列指令安裝 pykafka 套件。
pip install pykafka  

第二步

撰寫 Python 程式,並執行此程式碼,該程式主要是使用亂數的方法,模擬系統 Log 資訊,並將這些資訊以每 0.5 秒的頻率持續傳送至 Kafka 中的 Topic。

from pykafka import KafkaClient  
import time  
import random  
client = KafkaClient(hosts="172.16.1.81:6667")  
client.topics  
topic = client.topics['queue_test'] 


def gen():  
    TIMES = time.strftime("%H:%M:%S", time.localtime())
    Service = ['Web','DB','Network','Server','Ueser']
    Status =['warning','warning','warning','info','info','info','info','info','info','error']
    while True:
        TIMES = time.strftime("%H:%M:%S", time.localtime())
        oneService = random.choice(Service)
        oneStatus = random.choice(Status)
        Data =  TIMES +" "+oneService +" "+oneStatus
        return Data

with topic.get_sync_producer() as producer:  
    while True:
        producer.produce(gen())
        time.sleep(0.5)

第三步

執行成功時,我們可以透過 Kafka 指令確認 Topic 是否有建立完成,並且透過 Consumer 指令去將 Topic 內的訊息給接收下來。(指令 hdp 路徑需要依照您安裝的版本去做調整)

  • 列出目前 Topic 清單
/usr/hdp/2.4.0.0-169/kafka/bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181
  • 從 Topic 中接受訊息
/usr/hdp/2.4.0.0-169/kafka/bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic queue_test --from-beginning

版本資訊

  • Python v2.7.1
  • CentOS 6

Ben Shiue

Having being a full stack engineer. His interests in Node.js, ARM mbed, IoT solutions. Contact us : [email protected]

ALL RIGHTS RESERVED. COPYRIGHT © 2016. Designed and Coded by Makee.io