介绍RabbitMq的应用场景,与kafka的区别,以及原理和注意事项;

Broker:简单来说就是消息队列服务器实体。 
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。 
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。 
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。 
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。 
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。 
producer:消息生产者,就是投递消息的程序。 
consumer:消息消费者,就是接受消息的程序。 
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务

RabbitMQ的应用场景以及基本原理介绍

https://blog.csdn.net/whoamiyang/article/details/54954780

  • 应用解耦
  • 流量削峰
  • 系统架构
  • Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照 指定的方式传输。

RabbitMQ三种Exchange模式

https://www.cnblogs.com/hz04022016/p/6519445.html

RabbitMQ中,所有生产者提交的消息都由Exchange来接受,然后Exchange按照特定的策略转发到Queue进行存储。 RabbitMQ提供了四种Exchange:fanout, direct, topic, header。但常用的主要是fanout,direct,topic 性能排序:fanout > direct > topic 比例大约为11:10:6

Fanout Exchange

Fanout Exchange

    任何发送到Fanout Exchange的消息都会被转发到与该Exchange(Binding)的所有Queue上。
  • 可以理解为路由表的模式
  • 这种模式不需要RouteKey
  • 这种模式需要提前将Exchange 与 Queue进行绑定,一个Exchange可以板顶多个Queue,一个Queue可以同多个Exchange进行绑定
  • 如果接受到消息的Exchange没有与任何Queue进行绑定,则消息会被抛弃

注意:广播,是实时的,收不到就没了,消息不会存下来,类似收音机。



          |------------------------|
          |            /—— queue <—|—> consumer1
producer —|—exchange1 <bind        |                 
       \  |            \—— queue <—|—> consumer2
        \-|-exchange2    ……        |
          |------------------------|

样例代码
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
# 注意:这里是广播,不需要声明queue
channel.exchange_declare(exchange='logs',  # 声明广播管道
                         type='fanout')    # type默认是direct模式

# message = ' '.join(sys.argv[1:]) or "info: Hello World!"
message = "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',  # 注意此处空,必须有
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')
# 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
result = channel.queue_declare(exclusive=True)
# 获取随机的queue名字
queue_name = result.method.queue
print("random queuename:", queue_name)

channel.queue_bind(exchange='logs',  # queue绑定到转发器上
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

TODO Direct Exchange

Direct Exchange

    任何发送到Direct Exchange的消息都会被转达到RouteKey中指定的的Queue
  • 一般情况下可以使用rabbitMQ自带的Exchange: ""(该Exchange的名字为空字符串,下文称其为default Exchange)。
  • 这种模式下不需要讲Exchange进行任何绑定(binding)操作
  • 消息传递时需要一个"RoutingKey",可以简单理解为要发送到的队列名字
  • 如果vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃

正常情况下,queue一直会等到消费者去消费信息。

如果长时间没有人去消费信息,咋办? 一个queue其实可以bind多个key的信息;

!!! 同样,假如exchange下面没有binding好queue(自己手动设置的),则消息发送到这后,会丢失; 假如有之前binding过具体的queue的话,则exchange接收到的消息可以发送到对应的queue上,即使 没有消费者消费这些信息;

样例代码
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')
# 重要程度级别,这里默认定义为 info
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 获取运行脚本所有的参数
severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)
# 循环列表去绑定
for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

默认写法

import pika

# 建立一个实例
connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost',5672)  # 默认端口5672,可不写
    )
# 声明一个管道,在管道里发消息
channel = connection.channel()
# 在管道里声明queue
channel.queue_declare(queue='hello')
# RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',          # 这个地方不指定的话,使用的是默认的direct模式的AMQP
                      routing_key='hello',  # queue名字
                      body='Hello World!')  # 消息内容
print(" [x] Sent 'Hello World!'")
connection.close()  # 队列关闭


Topic Exchange

Topic Exchange

任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue上

1.这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”(RouteKey),Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配的队列。

2.这种模式需要RouteKey,也许要提前绑定Exchange与Queue。

3.在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及log的消息(一个RouteKey为”MQ.log.error”的消息会被转发到该队列)。

4.“#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。

5.同样,如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息。

样例代码
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))


import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

RabbitMQ UI

UI里面的消息内容一共三种类型 ready unacked total; 具体代表是什么意思呢? https://blog.csdn.net/stonexmx/article/details/51885745

Pika提供如下适配器

https://blog.csdn.net/wulex/article/details/66971823

 BlockingConnection - 启用对库进行阻塞,同步操作以进行简单的使用
 LibevConnection - 用于libev事件循环http://libev.schmorp.de的适配器
 SelectConnection - 快速异步适配器
 TornadoConnection - 适用于Tornado IO Loop的适配器http://tornadoweb.org
 TwistedConnection - 用于Twisted异步包的适配器http://twistedmatrix.com/

RabbitMQ死循环-延长ACK时间

RabbitMQ URI Specification

TODO Rabbitmq消费失败死信队列

我为什么要选择RabbitMQ ,RabbitMQ简介,各种MQ选型对比

https://www.sojson.com/blog/48.html

python(十一)上:RabbitMQ 使用详细介绍

https://blog.csdn.net/fgf00/article/details/52872730

java应用实例

https://blog.csdn.net/Dome_/article/details/80028087