python - 如何使用pykafka consumer進(jìn)行數(shù)據(jù)處理并保存?
問(wèn)題描述
使用本地kafka bin/kafka-console-producer.sh --broker-list kafkaIP:port --topic topicName創(chuàng)建命令行生產(chǎn)數(shù)據(jù),然后打開(kāi)python
from pykafka import KafkaClientclient = KafkaClient(hosts='192.168.x.x:9092')topic = client.topics[’wr_test’]consumer = topic.get_balanced_consumer(consumer_group=’test-consumer-group’,auto_commit_enable=True,zookeeper_connect=’192.168.x.x:2121’)
然后自己編寫(xiě)了簡(jiǎn)單的一套處理函數(shù),從外部引用。將數(shù)據(jù)處理后存入elasticsearch 或者 數(shù)據(jù)庫(kù)比如for msg in consumer:
if msg is not None: 外部引入的處理函數(shù)(msg.value)
在python命令行for msg in consumer:
print msg.offset, msg.value
這時(shí)候使用生產(chǎn)者敲入一些數(shù)據(jù),在消費(fèi)端就會(huì)就會(huì)立即打印出來(lái)但是寫(xiě)成py文件之后,每次運(yùn)行只會(huì)處理最近的生產(chǎn)的一次內(nèi)容,在生產(chǎn)者中再進(jìn)行輸入一些內(nèi)容,py文件就不會(huì)再進(jìn)行數(shù)據(jù)處理了。所以向問(wèn)下如何編寫(xiě)能運(yùn)行后能一直對(duì)消費(fèi)者數(shù)據(jù)進(jìn)行處理的函數(shù)?要注意哪些地方?
另外,get_balanced_consumer的方法,是連接zookeeper消費(fèi)使用topic.get_simple_consumer是直接消費(fèi)kafka,使用這種方式就提示No handler for...的錯(cuò)誤
還有一個(gè)疑問(wèn),就是實(shí)際生產(chǎn)環(huán)境日志產(chǎn)生量很快,應(yīng)該如何編寫(xiě)一個(gè)多線程處理方法?
問(wèn)題解答
回答1:在別人的博客看到一種替代的解決方案http://www.cnblogs.com/castle...從consumer中將msg.value讀取到一個(gè)列表當(dāng)中,然后從列表中讀取數(shù)據(jù)進(jìn)行數(shù)據(jù)處理,當(dāng)這個(gè)流程結(jié)束后,再把列表中獲取的數(shù)據(jù)pop掉。另外也要用try: ... except :... continue
相關(guān)文章:
1. mysql - 兩張表做修改2. mysql多表查詢3. 怎么用navicat for mysql連接別的電腦的數(shù)據(jù)庫(kù)?4. 開(kāi)發(fā)一套網(wǎng)站CMS難嗎?5. 更新mysql中被別人鎖定的行, 能不能快速失敗直接報(bào)錯(cuò), 而不是一直等待6. 主從復(fù)制 - MySQL 主從延遲 300s 以上,求大神解答7. MySQL 查詢疑問(wèn)?8. 最新版本的PhpStudy支持64位的PHP嗎? 為什么我把64位的PHP放入后,后臺(tái)收不到POST9. mysql - 類(lèi)似于之類(lèi)的通知系統(tǒng)如何設(shè)計(jì)數(shù)據(jù)庫(kù)10. 使用mysql自增主鍵遇到的問(wèn)題
