博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ详解(三)
阅读量:5951 次
发布时间:2019-06-19

本文共 9988 字,大约阅读时间需要 33 分钟。

一、分发到多Consumer(fanout)
二、Routing路由(Direct)
三、主题路由(Topic)

一、分发到多Consumer(fanout)
将同一个Message deliver到多个Consumer中。这个模式也被称为"publish/subscribe"
创建一个日志系统,包含两部分:第一部分发出log(Producer),第二部分接收到并打印(Consumer)。两个Consumer,第一个将log写到物理磁盘上;第二个将log输出的屏幕。

1.发送消息流程:

    1.Producer发送的Message实际上是发到了Exchange中。
    2.Exchanges从Producer接收message投递到queue中
    3.Prducer发送的消息只是到达了Exchange中,Exchange具有不同的类型实现不同的分发方式

Exchnges的类型:direct、topic和fanout
fanout就是广播模式,会将所有的Message都放到它所知道的queue中
channel.exchange_declare(exchange='logs',  
    type='fanout')   //创建一个名字为logs,类型为fanout的Exchange:

1
2
3
4
5
6
7
8
9
10
11
[root@node
112 
~]# rabbitmqctl list_exchanges //查看所有的Exchanges
Listing exchanges ...
logs  fanout
amq.direct    direct
amq.fanout    fanout
amq.headers    headers
amq.match    headers
amq.rabbitmq.log    topic
amq.rabbitmq.trace    topic
amq.topic    topic
...done.

注意:amq.* exchanges 和the default (unnamed)exchange是RabbitMQ默认创建的。 

通过exchange,而不是routing_key来publish Message:
channel.basic_publish(exchange='logs',  
    routing_key='',  
    body=message)  

2.临时队列
截至现在,我们用的queue都是有名字的:第一个是hello,第二个是task_queue。使用有名字的queue,使得在Producer和Consumer之前共享queue成为可能。
但是对于我们将要构建的日志系统,并不需要有名字的queue。我们希望得到所有的log,而不是它们中间的一部分。而且我们只对当前的log感兴趣。为了实现这个目标,我们需要两件事情:
    1)每当Consumer连接时,我们需要一个新的,空的queue。因为我们不对老的log感兴趣。幸运的是,如果在声明queue时不指定名字,那么RabbitMQ会随机为我们选择这个名字。方法:
    result = channel.queue_declare() 
    通过result.method.queue 可以取得queue的名字。基本上都是这个样子:amq.gen-JzTY20BRgKO-HjmUJj0wLg。
    2)当Consumer关闭连接时,这个queue要被deleted。可以加个exclusive的参数。方法:
    result = channel.queue_declare(exclusive=True)   //每次获取的都是新的,单独使用的
    
3.Bindings绑定

    创建好fanout类型的Exchange和没有名字的queue后(实际上是RabbitMQ帮我们取的名字)Exchange通过bindings把它的Message发送到目标queue
    channel.queue_bind(exchange='logs',  
        queue=result.method.queue)      
    使用命令rabbitmqctl list_bindings 查看bindings
    
4.最终代码
拓扑图:
1.png

Producer,在这里就是产生log的program,基本上和前几个都差不多。最主要的区别就是publish通过了exchange而不是routing_key。
emit_log.py script:
===========================================================================

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
    
host=
'localhost'
))
channel = connection.channel()
channel.exchange_declare(exchange=
'logs'
,
    
type=
'fanout'
)
message = 
' '
.join(sys.argv[
1:
]) or 
"info: Hello World!"
channel.basic_publish(exchange=
'logs'
,
    
routing_key=
''
,
    
body=message)
print 
" [x] Sent %r" 
% (message,)
connection.close()

还有一点要注意的是我们声明了exchange。publish到一个不存在的exchange是被禁止的。如果没有queue bindings exchange的话,log是被丢弃的。
Consumer:receive_logs.py:
===========================================================================

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
    
host=
'localhost'
))
channel = connection.channel()
channel.exchange_declare(exchange=
'logs'
,
    
type=
'fanout'
)
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange=
'logs'
,
    
queue=queue_name)
print 
' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
    
print 
" [x] %r" 
% (body,)
channel.basic_consume(callback,
    
queue=queue_name,
    
no_ack=True)
channel.start_consuming()

试运行:
    Consumer1:$ python receive_logs.py > logs_from_rabbit.log  //追加到文件
    Consumer2:python receive_logs.py //输出到屏幕
    Producer:python emit_log.py
也可通过修改callback自己写文件
输出结果如图:
3.png

二、Routing路由(Direct)
对于上一个日志系统改进。能够使用不同的severity来监听不同等级的log。比如我们希望只有error的log才保存到磁盘上。
1.Bindings绑定
之前的绑定
channel.queue_bind(exchange=exchange_name,  
    queue=queue_name)  
绑定其实就是关联了exchange和queue。或者这么说:queue对exchagne的内容感兴趣,exchange要把它的Message deliver到queue中。
实际上,绑定可以带routing_key 这个参数。其实这个参数的名称和basic_publish 的参数名是相同了。为了避免混淆,我们把它成为binding key。
    使用一个key来创建binding :
channel.queue_bind(exchange=exchange_name,  
    queue=queue_name,  
    routing_key='black') 
对于fanout的exchange来说,这个参数是被忽略的。

2.Direct Exchange

通过Bindings key完全匹配
图Direct路由模型
Direct.png

exchange X和两个queue绑定在一起。Q1的binding key是orange。Q2的binding key是black和green。
当P publish key是orange时,exchange会把它放到Q1。如果是black或者green那么就会到Q2。其余的Message都会被丢弃。

3.多重绑定(Multiple Bindings)

多个queue绑定同一个key是可以的。对于下图的例子,Q1和Q2都绑定了black。也就是说,对于routing key是black的Message,会被deliver到Q1和Q2。其余的Message都会被丢弃。
图muliti-bindings
multi.png

4.生产者和消费者

生产者:
===========================================================================

1
2
3
4
5
6
7
8
channel.exchange_declare(exchange=
'direct_logs'
,  
    
type=
'direct'
)  
//创建一个direct的exchange。使用log的severity作为routing key,这样Consumer可以针对不同severity的log进行不同的处理。
publish:
channel.basic_publish(exchange=
'direct_logs'
,  
    
routing_key=severity, 
    
body=message)  
//涉及三种severity:
'info'
'warning'
'error'
.

消费者:
===========================================================================

1
2
3
4
5
6
7
result = channel.queue_declare(exclusive=True)  
queue_name = result.method.queue  
for severity in severities:  
    
channel.queue_bind(exchange=
'direct_logs'
,  
        
queue=queue_name,  
        
routing_key=severity) 
//queue需要绑定severity

5.最终版本
图:direct_2
direct_2.png

emit_log_direct.py 
===========================================================================

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
    
host=
'localhost'
))
channel = connection.channel()
channel.exchange_declare(exchange=
'direct_logs'
,
    
type=
'direct'
)
severity = sys.argv[
1
] if len(sys.argv) > 
1 
else 
'info'
message = 
' '
.join(sys.argv[
2:
]) or 
'Hello World!'
channel.basic_publish(exchange=
'direct_logs'
,
    
routing_key=severity,
    
body=message)
print 
" [x] Sent %r:%r" 
% (severity, message)
connection.close()

receive_logs_direct.py: 
===========================================================================

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/usr/bin/env python  
import pika  
import sys  
connection = pika.BlockingConnection(pika.ConnectionParameters(  
    
host=
'localhost'
))  
channel = connection.channel()  
channel.exchange_declare(exchange=
'direct_logs'
,  
    
type=
'direct'
)  
result = channel.queue_declare(exclusive=True)  
queue_name = result.method.queue  
severities = sys.argv[
1:
]  
if not severities:  
    
print 
>> sys.stderr, 
"Usage: %s [info] [warning] [error]" 
% \  
        
(sys.argv[
0
],)  
    
sys.exit(
1
)  
for severity in severities:      
    
channel.queue_bind(exchange=
'direct_logs'
,  
        
queue=queue_name,  
        
routing_key=severity)  
print 
' [*] Waiting for logs. To exit press CTRL+C'  
def callback(ch, method, properties, body):  
    
print 
" [x] %r:%r" 
% (method.routing_key, body,)  
channel.basic_consume(callback,  
    
queue=queue_name,  
    
no_ack=True)  
channel.start_consuming()

===========================================================================
试运行:
$ python receive_logs_direct.py warning error > logs_from_rabbit.log 
    //把warning和error的log记录到一个文件中
$ python receive_logs_direct.py info warning error  
    //打印所有log到屏幕    

三、主题路由(Topic)
1.Topic exchange

Message的routing_key使用限制,不能使任意的。格式是以点号“."分割的字符表。
比如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。你可以放任意的key在routing_key中,当然最长不能超过255 bytes。
    对于routing_key,有两个特殊字符(在正则表达式里叫元字符):
    * (星号) 代表任意 一个单词
    # (hash) 0个或者多个单词
示例:
Producer发送消息时需要设置routing_key,routing_key包含三个单词和两个点号。
    第一个key是描述了celerity(灵巧,敏捷),第二个是colour(色彩),第三个是species(物种):"<celerity>.<colour>.<species>"。
在这里我们创建了两个绑定: Q1 的binding key 是"*.orange.*"; Q2 是  "*.*.rabbit" 和 "lazy.#":
    Q1 感兴趣所有orange颜色的动物
    Q2 感兴趣所有的rabbits和所有的lazy的
比如routing_key是 "quick.orange.rabbit"将会发送到Q1和Q2中。消息"lazy.orange.elephant" 也会发送到Q1和Q2。但是"quick.orange.fox" 会发送到Q1;"lazy.brown.fox"会发送到Q2。"lazy.pink.rabbit" 也会发送到Q2,但是尽管两个routing_key都匹配,它也只是发送一次。"quick.brown.fox" 会被丢弃。
如果发送的单词不是3个呢? 答案要看情况,因为#是可以匹配0个或任意个单词。比如"orange" or "quick.orange.male.rabbit",它们会被丢弃。如果是lazy那么就会进入Q2。类似的还有 "lazy.orange.male.rabbit",尽管它包含四个单词。

Topic exchange和其他exchange
    由于有"*" (star) and "#" (hash), Topic exchange 非常强大并且可以转化为其他的exchange:
    如果binding_key 是 "#" - 它会接收所有的Message,不管routing_key是什么,就像是fanout exchange。
    如果 "*" (star) and "#" (hash) 没有被使用,那么topic exchange就变成了direct exchange。

2.代码实现
The code for emit_log_topic.py:
========================================================================

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
    
host=
'localhost'
))
channel = connection.channel()
channel.exchange_declare(exchange=
'topic_logs'
,
    
type=
'topic'
)
routing_key = sys.argv[
1
] if len(sys.argv) > 
1 
else 
'anonymous.info'
message = 
' '
.join(sys.argv[
2:
]) or 
'Hello World!'
channel.basic_publish(exchange=
'topic_logs'
,
    
routing_key=routing_key,
    
body=message)
print 
" [x] Sent %r:%r" 
% (routing_key, message)
connection.close()

========================================================================

The code for receive_logs_topic.py:     
========================================================================    

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
    
host=
'localhost'
))
channel = connection.channel()
channel.exchange_declare(exchange=
'topic_logs'
,
    
type=
'topic'
)
     
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[
1:
]
if not binding_keys:
    
print 
>> sys.stderr, 
"Usage: %s [binding_key]..." 
% (sys.argv[
0
],)
    
sys.exit(
1
)
for binding_key in binding_keys:
    
channel.queue_bind(exchange=
'topic_logs'
,
        
queue=queue_name,
        
routing_key=binding_key)
print 
' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
    
print 
" [x] %r:%r" 
% (method.routing_key, body,)
channel.basic_consume(callback,
    
queue=queue_name,
    
no_ack=True)
channel.start_consuming()

    
3.运行和结果

    python receive_logs_topic.py "#"  //接收所有的log
    python receive_logs_topic.py "kern.*"  //接收所有kern facility的log
    python receive_logs_topic.py "*.critical"  //仅仅接收critical的log: 
    python receive_logs_topic.py "kern.*" "*.critical"  //可以创建多个绑定: 
    python emit_log_topic.py "kern.critical" "A critical kernel error"  //Producer产生一个log:"kern.critical" type: 
    
参考:    
http://www.rabbitmq.com/tutorials/tutorial-three-python.html

本文转自MT_IT51CTO博客,原文链接:http://blog.51cto.com/hmtk520/2051247,如需转载请自行联系原作者

你可能感兴趣的文章
洛谷 P2486 BZOJ 2243 [SDOI2011]染色
查看>>
linux 笔记本的温度提示
查看>>
数值积分中的辛普森方法及其误差估计
查看>>
Web service (一) 原理和项目开发实战
查看>>
跑带宽度多少合适_跑步机选购跑带要多宽,你的身体早就告诉你了
查看>>
广平县北方计算机第一届PS设计大赛
查看>>
深入理解Java的接口和抽象类
查看>>
java与xml
查看>>
Javascript异步数据的同步处理方法
查看>>
iis6 zencart1.39 伪静态规则
查看>>
SQL Server代理(3/12):代理警报和操作员
查看>>
Linux备份ifcfg-eth0文件导致的网络故障问题
查看>>
2018年尾总结——稳中成长
查看>>
JFreeChart开发_用JFreeChart增强JSP报表的用户体验
查看>>
度量时间差
查看>>
通过jsp请求Servlet来操作HBASE
查看>>
Shell编程基础
查看>>
Shell之Sed常用用法
查看>>
3.1
查看>>
校验表单如何摆脱 if else ?
查看>>