解釋pykafka參數最詳細的記錄

  1. Kafka

  1. 簡介

    Kafka 是一種分布式的、分區的、多副本的基于發布/訂閱的消息系統。它是通過 zookeeper 進行協調,常見可以用于 web/nginx 日志、訪問日志、消息服務等。主要應用場景為:日志收集系統和消息系統。

    Kafka 的主要設計目標如下:

      1. 以時間復雜度為 O(1) 的方式提供持久化能力,即使對 TB 級別以上的數據也能保證常數時間的訪問性能。

      2. 高吞吐率,即使在十分廉價的機器上也能實現單機支持每秒 100K 條消息的傳輸。

      3. 支持 Kafka Server (即 Kafka 集群的服務器)間的消息分區,及分布式消費,同時保證每個 partition 內的消息順序傳輸。

      4. 同時支持離線數據處理和實時數據處理

  2. Kafka 架構

圖片描述

    如上圖所示,一個 Kafka 集群由若干producer、若干consumer、若干broker,以及一個zookeeper集群所組成。Kafka通過zookeeper管理集群配置,選舉leader,以及在consumer group發生變化時進行rebalance。producer使用push模式將消息發布到broker,consumer使用pull模式從broker訂閱并消費消息。

    Kafka名詞解釋:

      broker:消息中間件處理結點,一個Kafka節點就是一個broker,多個broker可以組成一個Kafka集群,相當于物理層面上的一臺服務器。

      topic:存放同一類消息的位置,是一個概念層面上的名詞,Kafka集群可以負責多個topic的分發。(物理上不同topic的消息分開存儲,邏輯上一個topic的消息雖然保存于一個或多個broker上但用戶只需指定消息的topic即可生產或消費數據而不必關心數據存于何處)

      partition:topic在物理層面上的分組,一個topic可以分為多個partition,每個partition是一個有序的隊列,創建topic時可以指定partition數量,每個partition對應于一個文件夾,該文件夾下存儲該partition的數據和索引文件。一般來說partition的數量大于等于broker的數量。

      producer:負責發布消息到Kafka broker

      consumer:消費消息,每個consumer屬于一個特定的consumer group(可為每個consumer指定group name,若不指定group name則為默認的group)。使用consumer high level API時,同一topic的一條消息只能被一個consumer group的一個consumer消費,但多個consumer group可同時消費這條消息。

      consumer group:每個consumer屬于一個特定的consumer group,consumer group是實際記錄的概念。

  3. Kafka數據傳輸的事務特點

    1. at most once

      這種模式下consumer fetch消息,先進行commit,再進行處理。如果再處理消息的過程中出現異常,下次重新開始工作就無法讀到之前已經確認而未處理的消息。

    2. at least once

      消息至少發送一次,如果消息未能接受成功,可能會重發,直到接收成功。消費者fetch消息,然后處理消息,然后保存offset。如果消息處理成功之后,但是在保存offset階段zookeeper異常導致保存操作未能執行成功,這就導致接下來再次fetch時可能獲得上次已經處理過的消息,這就是”at least once”,原因offset沒有及時的提交給zookeeper,zookeeper恢復正常還是之前offset狀態。

    3. exactly once

      消息只會發送一次,Kafka中并沒有嚴格的去實現,我們認為這種策略在Kafka中是沒有必要的。

    通常情況下,Kafka默認保證at least once。

  5. Push & Pull

    作為一個消息系統,Kafka遵循了傳統的方式,選擇由producer向broker push消息,并由consumer從broker中pull消息。

    push模式很難適應消費速率不同的消費者,因為消息發送速率是由broker決定的。push模式的目標就是以盡可能快的速度傳遞消息,但是這樣很容易造成consumer來不及處理消息,典型的表現是拒絕服務以及網絡擁塞。而pull模式可以根據consumer的消費能力以適當的速率消費消息。

  6. Topic & Partition

    Topic在邏輯上可以認為是一個存在的queue,每條消息都必須指定它的topic,可以簡單的理解為必須指明把這條消息放進哪個queue里。為了使Kafka的吞吐率可以水平擴展,物理上把topic分成一個或多個partition,每個partition在物理上對應一個文件夾,該文件夾下存儲這個partition的所有消息和索引文件。

    每個日志文件都是”log entries”序列,每一個log entry包含一個4字節整型值(值為N),其后跟N個字節的消息體。每條消息都有一個當前partition下唯一的64字節的offset,它指明了這條消息的起始位置,也是對數據的唯一標識,Kafka中并沒有提供額外的索引機制來存儲offset,因為在Kafka中幾乎不允許對消息進行”隨機讀寫”。磁盤上log entry的存儲格式如下:

      message length:4 bytes(它的具體值為1+4+n,如下所示)

      ”magic” value:1 byte

      crc:4 bytes

      payload:n bytes

    這個log entries并非由一個文件構成,而是分為多個segment,每個segment名為該segment第一條消息的offset和”.kafka”組成。另外會有一個索引文件,它標明了每個segment下包含的log entry的offset范圍。

    因為每條消息都被append到該partition中,是順序寫磁盤,因此效率非常高(經驗證,順序寫磁盤比隨機寫內存還要高,這是Kafka高吞吐率的一個保證)。

    每條消息被發送到topic時,會根據指定的partition規則選擇被存儲到哪一個partition。如果partition規則設計的合理,所有的消息會均勻分配到不同的partition里,這樣就實現了水平擴展。(如果一個topic對應一個文件,那這個文件所在的機器I/O將會成為這個topic的性能瓶頸,而partition解決了這個問題)。在創建topic時,可以在$KAFKA_HOME/config/server.properties中指定這個partition的數量(如下所示),當然也可以在topic創建之后去修改parition數量。

The default number of log partitions per topic. More partitions allow greater

parallelism for consumption, but this will also result in more files across

the brokers.

num.partitions=3     在發送一條消息時,可以指定這條消息的key,producer根據這個key和partition機制來判斷這個將這條消息發送到哪個partition。paritition機制可以通過指定producer的paritition. class這一參數來指定,該class必須實現kafka.producer.Partitioner接口。(比如如果一個key能夠被解析為整數,那么將對應的整數與partition總數取余,可以作為該消息被發送到的partition id)

  7. 歷史數據刪除機制

    對于傳統的message queue而言,一般會刪除已經消費過的消息,而Kafka集群會保留所有的消息,無論其被消費與否。當然,由于磁盤限制,不可能永久保留,因此Kafka提供兩種機制去刪除舊數據。一是基于時間,一是基于partition文件大小。(例如可以通過配置$KAFKA_HOME/config/server.properties,讓Kafka刪除一周前的數據,也可通過配置讓Kafka在partition文件超過1GB時刪除舊數據)

    這里要注意,因為Kafka讀取特定消息的時間復雜度為O(1),即與文件大小無關,所以這里刪除文件與Kafka性能無關,選擇怎樣的刪除策略只與磁盤以及具體的需求有關。另外,Kafka會為每一個consumer group保留一些metadata信息—當前消費的消息的position,也即offset。這個offset由consumer控制。正常情況下consumer會在消費完一條消息后線性增加這個offset。當然,consumer也可將offset設成一個較小的值,重新消費一些消息。因為offet由consumer控制,所以Kafka broker是無狀態的,它不需要標記哪些消息被哪些consumer過,不需要通過broker去保證同一個consumer group只有一個consumer能消費某一條消息,因此也就不需要鎖機制,這也為Kafka的高吞吐率提供了有力保障。

  8. Consumer Group

    對于程序員來說,consumer group 是消費 Kafka 消息隊列中消息的接口,每個 consumer group 可以消費多個 topic,對于每一個 topic 可以有多個消費者實體 consumer(對應多個程序或者進程)。在消費過程中,同一個 topic 中的消息只會被同一個 consumer group 中的一個 consumer 消費,而不會出現重復訂閱的情況。對于每一個 consumer group 消費 topic,可以手動commit,也可以設置參數集群自動commit(確認)消費進行的位置,保證下一次能接著從上次的位置繼續消費。

    consumer group 和 topic 一樣,也是直接使用就能新建的。如果直接新建一個 consumer,而不指定具體的 consumer group,系統會自動的指定默認的 consumer group,并且從最老的數據(EARLIEST)位置開始消費。

  詳情參考:

  http://developer.51cto.com/art/201501/464491.htm

  http://geek.csdn.net/news/detail/229569

  http://www.cnblogs.com/likehua/p/3999538.html

  1. PyKafka 的使用

  1. 導入 pykafka 模塊

    import pykafka

    from pykafka import KafkaClient

  2. 初始化 KafkaClient

    client = KafkaClient(hosts=“127.0.0.1:9092,127.0.0.1:9093,…“) 可以通過 hosts 地址初始化,也可以使用 zookeeper_hosts 進行初始化:

    client = KafkaClient(zookeeper_hosts = ‘yq01-ps-4-m42-pc177.yq01:2181,yq01-ps-4-m42-pc186.yq01:2181,yq01-ps-4-m42-pc187.yq01:2181,yq01-ps-4-m42-pc191.yq01:2181,yq01-ps-4-m42-pc192.yq01:2181’)

  3. Topic 對象

    client.topics 可以查看當前所有的 topic。

    topic = client.topics[‘bjhapp_history’]  #如果該 topic 存在,那么會選中對應的 topic;如果不存在,會自動新建該 topic。

    Topic 對象包含的方法:

      1. get_balanced_consumer(consumer_group, managed=False, **kwargs) :生成對應 consumer_group 對 topic 下消息消費的一個 balanced_consumer,與 simple_consumer 的差別在于如果有多個 consumer 進來對同一個 topic 的消息進行訂閱,balanced_consumer 會自動平衡和分配 partitions 給每個 consumer;而先進來的 simple_consumer 會對當前 topic 的partition 有100%的占有權。

        參數:consumer_group:消費的 consumer_group 名

           managed:是否對 consumer_group 進行管理

           **kwargs:對應于 consumer 對象的眾多參數

      2. get_producer(use_rdkafka=False, **kwargs):生成對應 topic 的一個異步消息 producer

      3. get_simple_consumer(consumer_group=None, use_rdkafka=False, **kwargs):生成對應 consumer_group 對 topic 的一個 simple_consumer

      4. get_sync_producer(**kwargs):生成對 topic 的一個同步 producer

    成員變量:

      name:topic 的名字

      partitions:包含當前 topic 對應 partitions 的字典

  4. Producer 對象

    1. 同步的 producer 對象

      producer = topic.get_sync_producer()

      producer.produce(“test”)

      同步的 producer 對象發布消息時,只有在確認消息成功發送到集群時才返回,因此網絡 IO 的速度會影響程序的整體速度。

    2. 異步的 producer 對象

      為了實現更高的吞吐量,我們推薦使用異步模式的 producer,這樣 produce() 函數能夠立即返回,并且可以批量處理更多的消息,而不用等待當前消息發布成功的確認。我們通過隊列的接口同樣可以在之后收到消息發布成功的確認,需要設置參數 delivery_reports = True。

      producer = topic.get_producer(delivery_reports=True)  #初始化異步的 producer

      count = 0  #定義 count 變量用來存儲消息發送的條數,以便于定期檢查之前的消息是否發送成功

復制代碼 def produce(msg, partition_key): global count producer.produce(msg, partition_key = partition_key) count += 1 if count % 10 == 0: while True: try: old_msg, exc = producer.get_delivery_report(block = False) if exc is not None: log.warn(“fail to delivery msg: %s, exc: %s, try again”,
old_msg.partition_key, exc) if type(exc) is not MessageSizeTooLarge: producer.produce(old_msg.value, partition_key = old_msg.partition_key) else: log.info(“succ delivery msg: %s”, old_msg.partition_key) except Queue.Empty: break; 復制代碼       上述代碼每嘗試發布十條消息,就對之前發送的消息的 delivery_report 進行檢查,查看其發布是否成功的狀態,通過 producer.get_delivery_report() 函數返回之前發送失敗的消息和結果,如果沒有發布成功,會嘗試重新進行發送。直到 delivery_report 的隊列為空。

      要注意 producer 發布消息時是先將消息存儲在緩存區,再將緩存區的消息發布到 Kafka 集群。所以異步的 produce() 函數執行完后,依然需要一定的時間來實現消息從緩存區的發布。所以如果文件執行結束,producer 對象會自動釋放,導致消息發布不成功,返回錯誤:ReferenceError: weakly-referenced object no longer exists。解決方法,在程序的尾部讓程序等待一段時間使消息發布完成,例如:sleep(6)

  5. Consumer 對象

    當一個 PyKafka consumer 開始從一個 topic 中訂閱消息時,它在記錄器中的起始位置是由 auto_offset_reset 和 reset_offset_on_start 兩個參數確定的。

consumer = topic.get_simple_consumer( consumer_group = ‘my_group’, auto_offset_reset = OffsetType.EARLIEST, reset_offset_on_start=False )     同樣,是否 Kafka 集群保有任何之前的 consumer group/topic/partition set 的消費偏移量也會影響數據的初始訂閱點。一個 new group/topic/partition set 就是之前沒有任何 commited offsets,一個存在的就是有 commited offsets 的。這兩者的訂閱點由下面的規則決定:

      1. 對于一個新的 consumer group/topic/partitions,不管參數 reset_offset_on_start 的參數是什么,都會從 auto_offset_reset 指定的位置開始消息訂閱。

      2. 對于一個已經存在的 consumer group/topic/partitions,假設參數 reset_offset_on_start 為false,那么消費會從上一次消費的偏移量之后開始進行(比如上一次的消費偏移量為4,那么消費會從5開始)。假設參數為 true,會自動從 auto_offset_reset 指定的位置開始消費。

    Tips:

    1. No handlers could be found for logger “pykafka.simpleconsumer”

      錯誤的原因是 consumer 在訂閱消息時需要有一個 logger 來記錄日志,如果有一個全局 logger 對象,會自動的寫入該全局對象中,否則會報這條信息,但是不影響消息訂閱。

    2. 有的時候會出現 consumer 在訂閱消息時遲遲不能讀出現的情況,這是由于 KafkaClient 的未知原因導致的,可以嘗試在初始化 consumer 的參數中加上 consumer_timeout_ms 參數來解決問題。該參數表示 consumer 在返回 None 前嘗試等待可以消費的消息的時間。



import sys
from pykafka import KafkaClient
from pykafka.balancedconsumer import BalancedConsumer
from pykafka.simpleconsumer import OwnedPartition, OffsetType
 
reload(sys)
sys.setdefaultencoding('utf8')
 
#pykafka, need install PyKafka
 
class PyKafka:
 
    consumer = None
    TOPIC   = 'log_download'
    BROKER_LIST = '10.23.23.24:9092,10.23.23.21:9092'
    ZK_LIST = '10.23.23.24:2181,10.23.23.21:2181/sh-bt'
 
    server = topic = zsServer = None
 
 
    def __init__(self):
        print("begin pykafka")
        self.server  = self.BROKER_LIST
        self.topic   = self.TOPIC
        self.zkServer= self.ZK_LIST
 
    def getConnect(self):
        client = KafkaClient(hosts=self.server)
        topic = client.topics[self.topic]
 
        self.consumer = topic.get_balanced_consumer(
            consumer_group="zs_download_04", # 自己命令
            auto_offset_reset=OffsetType.LATEST,#在consumer_group存在的情況下,設置此變量,表示從最新的開始取
            #auto_offset_reset=OffsetType.EARLIEST,
            #reset_offset_on_start=True,
            #auto_commit_enable=True,
            zookeeper_connect=self.zkServer
        )
        #self.consumer = topic.get_simple_consumer(reset_offset_on_start=False)
        self.consumer.consume()
        self.consumer.commit_offsets()
        return self.consumer
    
    def disConnect(self):
        #self.consumer.close()
        pass
 
        
    def beginConsumer(self):
        for oneLog in self.consumer:
            print(oneLog.offset)
            print(oneLog.value)
 
 
if __name__ == '__main__':
 
    pk = PyKafka()
    pk.getConnect()
    pk.beginConsumer()

本文鏈接:參與評論 ?

--EOF--

提醒:本文最后更新于 388 天前,文中所描述的信息可能已發生改變,請謹慎使用。

專題「web開發」的其它文章 ?

Comments

好日子高手社区广东好日子