Python RabbitMQ實現簡單的進程間通信示例
RabbitMQ 消息隊列
PYthreading Queue進程Queue 父進程與子進程,或同一父進程下的多個子進程進行交互缺點:兩個不同Python文件不能通過上面兩個Queue進行交互
erlong基于這個語言創建的一種中間商win中需要先安裝erlong才能使用rabbitmq_server start
安裝 Python module
pip install pika
or
easy_install pika
or源碼
rabbit 默認端口15672查看當前時刻的隊列數rabbitmqctl.bat list_queue
exchange在定義的時候就是有類型的,決定到底哪些queue符合條件,可以接受消息fanout:所有bind到此exchange的queue都可以收到消息direct:通過routingkey和exchange決定唯一的queue可以接受消息topic: 所有符合routingkey(此時可以是一個表達式)的routingkey所bind的queue都可以接受消息 表達式符號說明: # 代表一個或多個字符 * 代表任何字符
RPCremote procedure call 雙向傳輸,指令<-------->指令執行結果實現方法:創建兩個隊列,一個隊列收指令,一個隊列發送執行結果
用rabbitmq實現簡單的生產者消費者模型
1) rabbit_producer.py
# Author : Xuefengimport pikaconnection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()# create the queue, the name of queue is 'hello'# durable=True can make the queue be exist, although the service have stopped before.channel.queue_declare(queue='hello', durable=True)# n RabbitMQ a message can never be sent directly to queue,it always need to go throughchannel.basic_publish(exchange = ' ', routing_key = 'hello', body = 'Hello world!', properties = pika.BasicPropreties( delivery_mode=2, # make the message persistence ) )print('[x] sent ’Hello world!’')connection.close()
2) rabbit_consumer.py
# Author : Xuefengimport pikaconnection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.queue_declare(queue='hello', durable=True)def callback(ch, method, properties, body): ’’’ Handle the recieved data :param ch: The address of the channel :param method: Information about the connection :param properties: :param body: :return: ’’’ print('------>', ch, method, properties ) print('[x] Recieved %r' % body) # ack by ourself ch.basic_ack(delivery_tag = method.delivery_tag)# follow is for consumer to auto change with the abilitychannel.basic_qos(profetch_count=1)# no_ack = True represent that the message cannot be transfor to next consumer,# when the current consumer is stop by accident.channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message. queue = 'hello', no_ack = True)print('[*] Waiting for messages. To Exit press CTRL+C')channel.start_consuming()
用rabbitmq中的fanout模式實現廣播模式
1) fanout_rabbit_publish.py
# Author : Xuefengimport pikaimport sys# 廣播模式:# 生產者發送一條消息,所有的開通鏈接的消費者都可以接收到消息connection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.exchange_declare(exchange='logs', type='fanout')message = ’ ’.join(sys.argv[1:]) or 'info:Hello world!'channel.basic_publish( exchange='logs', routing_key='', body=message)print('[x] Send %r' % message)connection.close()
2) fanout_rabbit_consumer.py
# Author : Xuefengimport pikaimport sysconnection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()# exclusive 排他,唯一的 隨機生成queueresult = channel.queue_declare(exclusive=True)queue_name = result.method.queueprint('Random queue name:', queue_name)channel.queue_bind(exchange='logs', queue=queue_name)def callback(ch, method, properties, body): ’’’ Handle the recieved data :param ch: The address of the channel :param method: Information about the connection :param properties: :param body: :return: ’’’ print('------>', ch, method, properties ) print('[x] Recieved %r' % body) # ack by ourself ch.basic_ack(delivery_tag = method.delivery_tag)# no_ack = True represent that the message cannot be transfor to next consumer,# when the current consumer is stop by accident.channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message. queue = 'hello', no_ack = True)print('[*] Waiting for messages. To Exit press CTRL+C')channel.start_consuming()
用rabbitmq中的direct模式實現消息過濾模式
1) direct_rabbit_publisher.py
# Author : Xuefengimport pikaimport sys# 消息過濾模式:# 生產者發送一條消息,通過severity優先級來確定是否可以接收到消息connection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.exchange_declare(exchange='direct_logs', type='direct')severity = sys.argv[1] if len(sys.argv) > 1 else 'info'message = ’ ’.join(sys.argv[2:]) or 'info:Hello world!'channel.basic_publish( exchange='direct_logs', routing_key=severity, body=message)print('[x] Send %r:%r' % (severity, message))connection.close()
2) direct_rabbit_consumer.py
# Author : Xuefengimport pikaimport sysconnection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.exchange_declare(exchange='direct_logs', type='direct')# exclusive 排他,唯一的 隨機生成queueresult = channel.queue_declare(exclusive=True)queue_name = result.method.queueprint('Random queue name:', queue_name)severities = sys.argv[1:]if not severities: sys.stderr.write('Usage:%s [info] [warning] [error]n' % sys.argv[0]) sys.exit(1)for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) def callback(ch, method, properties, body): ’’’ Handle the recieved data :param ch: The address of the channel :param method: Information about the connection :param properties: :param body: :return: ’’’ print('------>', ch, method, properties ) print('[x] Recieved %r' % body) # ack by ourself ch.basic_ack(delivery_tag = method.delivery_tag)# no_ack = True represent that the message cannot be transfor to next consumer,# when the current consumer is stop by accident.channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message. queue = 'hello', no_ack = True)print('[*] Waiting for messages. To Exit press CTRL+C')channel.start_consuming()
用rabbitmq中的topic模式實現細致消息過濾模式
1) topic_rabbit_publisher.py
# Author : Xuefengimport pikaimport sys# 消息細致過濾模式:# 生產者發送一條消息,通過運行腳本 *.info 等確定接收消息類型進行對應接收connection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.exchange_declare(exchange='topic_logs', type='topic')binding_key = sys.argv[1] if len(sys.argv) > 1 else 'info'message = ’ ’.join(sys.argv[2:]) or 'info:Hello world!'channel.basic_publish( exchange='topic_logs', routing_key=binding_key, body=message)print('[x] Send %r:%r' % (binding_key, message))connection.close()
2) topic_rabbit_consumer.py
# Author : Xuefengimport pikaimport sysconnection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.exchange_declare(exchange='topic_logs', type='topic')# exclusive 排他,唯一的 隨機生成queueresult = channel.queue_declare(exclusive=True)queue_name = result.method.queueprint('Random queue name:', queue_name)binding_keys = sys.argv[1:]if not binding_keys: sys.stderr.write('Usage:%s [info] [warning] [error]n' % sys.argv[0]) sys.exit(1)for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)def callback(ch, method, properties, body): ’’’ Handle the recieved data :param ch: The address of the channel :param method: Information about the connection :param properties: :param body: :return: ’’’ print('------>', ch, method, properties) print('[x] Recieved %r' % body) # ack by ourself ch.basic_ack(delivery_tag=method.delivery_tag)# no_ack = True represent that the message cannot be transfor to next consumer,# when the current consumer is stop by accident.channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message. queue='hello', no_ack=True)print('[*] Waiting for messages. To Exit press CTRL+C')channel.start_consuming()
用rabbitmq實現rpc操作
1) Rpc_rabbit_client.py
# Author : Xuefengimport pikaimport timeimport uuidclass FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue # 隨機的生成一個接收命令執行結果的隊列 self.channel.basic_consume(self.on_response, # 只要收到消息就調用 no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self,n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish( exchange='', routing_key='rpc_queue', properties=pika.BasicPropreties( rely_to=self.callback_queue, correlation_id=self.corr_id # 通過隨機生成的ID來驗證指令執行結果與指令的匹配性 ), body=str(n) ) while self.response is None: self.connection.process_data_events() # 非阻塞版的start_consume,有沒有消息都繼續 print('no message...') time.sleep(0.5) return int(self.response)fibonacci_rcp = FibonacciRpcClient()print('[x] Requesting fib(30)')response = fibonacci_rcp.call(30)print('[x] Rec %r' % response)
2) Rpc_rabbit_server.py
# Author : Xuefengimport pikaimport sysconnection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.queue_declare(queue='rpc_queue')def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1)+fib(n-2)def on_request(ch, method, props, body): n = int(body) print('[.] fib(%s)' % n) response = fib(n) ch.basic_publish( exchange='', routing_key=props.rely_to, properties=pika.BasicPropreties(correlation_id= props.correlation), body = str(body) ) ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_qos(prefetch_count=1)channel.basic_consume(on_request, queue='rpc_queue')print('[x] Awaiting RPC requests')channel.start_consumeing()channel.exchange_declare(exchange='direct_logs', type='direct')# exclusive 排他,唯一的 隨機生成queueresult = channel.queue_declare(exclusive=True)queue_name = result.method.queueprint('Random queue name:', queue_name)severities = sys.argv[1:]
到此這篇關于Python RabbitMQ實現簡單的進程間通信示例的文章就介紹到這了,更多相關Python RabbitMQ進程間通信內容請搜索好吧啦網以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持好吧啦網!
相關文章: