亚洲精品久久久中文字幕-亚洲精品久久片久久-亚洲精品久久青草-亚洲精品久久婷婷爱久久婷婷-亚洲精品久久午夜香蕉

您的位置:首頁技術文章
文章詳情頁

Python RabbitMQ實現簡單的進程間通信示例

瀏覽:99日期:2022-07-18 18:36:11

RabbitMQ 消息隊列

PYthreading Queue進程Queue 父進程與子進程,或同一父進程下的多個子進程進行交互缺點:兩個不同Python文件不能通過上面兩個Queue進行交互

erlong基于這個語言創建的一種中間商win中需要先安裝erlong才能使用rabbitmq_server start

安裝 Python module

pip install pika

or

easy_install pika

or源碼

rabbit 默認端口15672查看當前時刻的隊列數rabbitmqctl.bat list_queue

exchange在定義的時候就是有類型的,決定到底哪些queue符合條件,可以接受消息fanout:所有bind到此exchange的queue都可以收到消息direct:通過routingkey和exchange決定唯一的queue可以接受消息topic: 所有符合routingkey(此時可以是一個表達式)的routingkey所bind的queue都可以接受消息 表達式符號說明: # 代表一個或多個字符 * 代表任何字符

RPCremote procedure call 雙向傳輸,指令<-------->指令執行結果實現方法:創建兩個隊列,一個隊列收指令,一個隊列發送執行結果

用rabbitmq實現簡單的生產者消費者模型

1) rabbit_producer.py

# Author : Xuefengimport pikaconnection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()# create the queue, the name of queue is 'hello'# durable=True can make the queue be exist, although the service have stopped before.channel.queue_declare(queue='hello', durable=True)# n RabbitMQ a message can never be sent directly to queue,it always need to go throughchannel.basic_publish(exchange = ' ', routing_key = 'hello', body = 'Hello world!', properties = pika.BasicPropreties( delivery_mode=2, # make the message persistence ) )print('[x] sent ’Hello world!’')connection.close()

2) rabbit_consumer.py

# Author : Xuefengimport pikaconnection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.queue_declare(queue='hello', durable=True)def callback(ch, method, properties, body): ’’’ Handle the recieved data :param ch: The address of the channel :param method: Information about the connection :param properties: :param body: :return: ’’’ print('------>', ch, method, properties ) print('[x] Recieved %r' % body) # ack by ourself ch.basic_ack(delivery_tag = method.delivery_tag)# follow is for consumer to auto change with the abilitychannel.basic_qos(profetch_count=1)# no_ack = True represent that the message cannot be transfor to next consumer,# when the current consumer is stop by accident.channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message. queue = 'hello', no_ack = True)print('[*] Waiting for messages. To Exit press CTRL+C')channel.start_consuming()

用rabbitmq中的fanout模式實現廣播模式

1) fanout_rabbit_publish.py

# Author : Xuefengimport pikaimport sys# 廣播模式:# 生產者發送一條消息,所有的開通鏈接的消費者都可以接收到消息connection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.exchange_declare(exchange='logs', type='fanout')message = ’ ’.join(sys.argv[1:]) or 'info:Hello world!'channel.basic_publish( exchange='logs', routing_key='', body=message)print('[x] Send %r' % message)connection.close()

2) fanout_rabbit_consumer.py

# Author : Xuefengimport pikaimport sysconnection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()# exclusive 排他,唯一的 隨機生成queueresult = channel.queue_declare(exclusive=True)queue_name = result.method.queueprint('Random queue name:', queue_name)channel.queue_bind(exchange='logs', queue=queue_name)def callback(ch, method, properties, body): ’’’ Handle the recieved data :param ch: The address of the channel :param method: Information about the connection :param properties: :param body: :return: ’’’ print('------>', ch, method, properties ) print('[x] Recieved %r' % body) # ack by ourself ch.basic_ack(delivery_tag = method.delivery_tag)# no_ack = True represent that the message cannot be transfor to next consumer,# when the current consumer is stop by accident.channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message. queue = 'hello', no_ack = True)print('[*] Waiting for messages. To Exit press CTRL+C')channel.start_consuming()

用rabbitmq中的direct模式實現消息過濾模式

1) direct_rabbit_publisher.py

# Author : Xuefengimport pikaimport sys# 消息過濾模式:# 生產者發送一條消息,通過severity優先級來確定是否可以接收到消息connection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.exchange_declare(exchange='direct_logs', type='direct')severity = sys.argv[1] if len(sys.argv) > 1 else 'info'message = ’ ’.join(sys.argv[2:]) or 'info:Hello world!'channel.basic_publish( exchange='direct_logs', routing_key=severity, body=message)print('[x] Send %r:%r' % (severity, message))connection.close()

2) direct_rabbit_consumer.py

# Author : Xuefengimport pikaimport sysconnection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.exchange_declare(exchange='direct_logs', type='direct')# exclusive 排他,唯一的 隨機生成queueresult = channel.queue_declare(exclusive=True)queue_name = result.method.queueprint('Random queue name:', queue_name)severities = sys.argv[1:]if not severities: sys.stderr.write('Usage:%s [info] [warning] [error]n' % sys.argv[0]) sys.exit(1)for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) def callback(ch, method, properties, body): ’’’ Handle the recieved data :param ch: The address of the channel :param method: Information about the connection :param properties: :param body: :return: ’’’ print('------>', ch, method, properties ) print('[x] Recieved %r' % body) # ack by ourself ch.basic_ack(delivery_tag = method.delivery_tag)# no_ack = True represent that the message cannot be transfor to next consumer,# when the current consumer is stop by accident.channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message. queue = 'hello', no_ack = True)print('[*] Waiting for messages. To Exit press CTRL+C')channel.start_consuming()

用rabbitmq中的topic模式實現細致消息過濾模式

1) topic_rabbit_publisher.py

# Author : Xuefengimport pikaimport sys# 消息細致過濾模式:# 生產者發送一條消息,通過運行腳本 *.info 等確定接收消息類型進行對應接收connection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.exchange_declare(exchange='topic_logs', type='topic')binding_key = sys.argv[1] if len(sys.argv) > 1 else 'info'message = ’ ’.join(sys.argv[2:]) or 'info:Hello world!'channel.basic_publish( exchange='topic_logs', routing_key=binding_key, body=message)print('[x] Send %r:%r' % (binding_key, message))connection.close()

2) topic_rabbit_consumer.py

# Author : Xuefengimport pikaimport sysconnection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.exchange_declare(exchange='topic_logs', type='topic')# exclusive 排他,唯一的 隨機生成queueresult = channel.queue_declare(exclusive=True)queue_name = result.method.queueprint('Random queue name:', queue_name)binding_keys = sys.argv[1:]if not binding_keys: sys.stderr.write('Usage:%s [info] [warning] [error]n' % sys.argv[0]) sys.exit(1)for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)def callback(ch, method, properties, body): ’’’ Handle the recieved data :param ch: The address of the channel :param method: Information about the connection :param properties: :param body: :return: ’’’ print('------>', ch, method, properties) print('[x] Recieved %r' % body) # ack by ourself ch.basic_ack(delivery_tag=method.delivery_tag)# no_ack = True represent that the message cannot be transfor to next consumer,# when the current consumer is stop by accident.channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message. queue='hello', no_ack=True)print('[*] Waiting for messages. To Exit press CTRL+C')channel.start_consuming()

用rabbitmq實現rpc操作

1) Rpc_rabbit_client.py

# Author : Xuefengimport pikaimport timeimport uuidclass FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue # 隨機的生成一個接收命令執行結果的隊列 self.channel.basic_consume(self.on_response, # 只要收到消息就調用 no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self,n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish( exchange='', routing_key='rpc_queue', properties=pika.BasicPropreties( rely_to=self.callback_queue, correlation_id=self.corr_id # 通過隨機生成的ID來驗證指令執行結果與指令的匹配性 ), body=str(n) ) while self.response is None: self.connection.process_data_events() # 非阻塞版的start_consume,有沒有消息都繼續 print('no message...') time.sleep(0.5) return int(self.response)fibonacci_rcp = FibonacciRpcClient()print('[x] Requesting fib(30)')response = fibonacci_rcp.call(30)print('[x] Rec %r' % response)

2) Rpc_rabbit_server.py

# Author : Xuefengimport pikaimport sysconnection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.queue_declare(queue='rpc_queue')def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1)+fib(n-2)def on_request(ch, method, props, body): n = int(body) print('[.] fib(%s)' % n) response = fib(n) ch.basic_publish( exchange='', routing_key=props.rely_to, properties=pika.BasicPropreties(correlation_id= props.correlation), body = str(body) ) ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_qos(prefetch_count=1)channel.basic_consume(on_request, queue='rpc_queue')print('[x] Awaiting RPC requests')channel.start_consumeing()channel.exchange_declare(exchange='direct_logs', type='direct')# exclusive 排他,唯一的 隨機生成queueresult = channel.queue_declare(exclusive=True)queue_name = result.method.queueprint('Random queue name:', queue_name)severities = sys.argv[1:]

到此這篇關于Python RabbitMQ實現簡單的進程間通信示例的文章就介紹到這了,更多相關Python RabbitMQ進程間通信內容請搜索好吧啦網以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持好吧啦網!

標簽: Python 編程
相關文章:
主站蜘蛛池模板: 久久99精品久久久久久黑人 | 国产国语在线播放视频 | 免费爱爱的视频太爽了 | 精品一区二区三区视频在线观看免 | 国产三级网站 | 久久精品视频免费在线观看 | 成人永久福利免费观看 | 欧美一级毛片国产一级毛片 | 国产精品国产三级国产爱网 | 极品蜜桃臀美女啪啪 | 国模一区二区三区私啪啪 | 一个人看的ww免费视频 | 性做久久久久久久久浪潮 | 小明永久免费视频 | 久久综合九色综合亚洲小说 | 亚洲经典自拍 | 亚洲欧美一区二区三区在线观看 | 国产在线观看精品香蕉v区 国产在线每日更新 | 国产精品日韩欧美 | 高清性色生活片免费视频软件 | 亚洲黄色小视频 | 香蕉久久夜色精品国产尤物 | 亚洲欧洲无码一区二区三区 | 免费看一级淫片成人 | 免费国产人做人视频在线观看 | mm在线| 免费特黄 | 中文字幕亚洲综合久久男男 | 久久久久久亚洲精品影院 | 欧洲一级毛片 | 一级毛毛片毛片毛片毛片在线看 | 国产午夜精品久久久久免费视小说 | 免费在线观看的黄色网址 | 一级做片爱性视频免费 | 国产伦精品一区二区三区 | 精品视频麻豆网站 | 久久在现 | 深夜福利国产精品亚洲尤物 | 爱爱免费播放视频在线观看 | 亚洲一区二区三区成人 | 亚洲精品人成网在线播放影院 |