博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
rabbitmq redis
阅读量:7055 次
发布时间:2019-06-28

本文共 6118 字,大约阅读时间需要 20 分钟。

RabbitMQ

RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。

对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列

用rabbitmq实现一个简单的生产者消费者模型

发送端代码

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.25'))channel = connection.channel()channel.queue_declare(queue="hello")channel.basic_publish(exchange='',                     routing_key = 'hello',                     body='hello world',)print("Send hello world")connection.close()

接收端代码

1 import pika 2  3 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.25')) 4 channel = connection.channel() 5 channel.queue_declare(queue="hello") 6  7 def callback(ch,method,properties,body): 8     print(ch,method,properties) 9     print("received %s" %body)10 11 channel.basic_consume(callback,12                       queue='hello',13                       no_ack=True)14 15 print("waiting for messages to exit press 'CTRL+C'")16 channel.start_consuming()

通过上述代码便可以实现一个简单的生产者消费者模型,但是现在的结果是:当开启多个消费者程序的时候,启动生产者发送消息,这个时候只有一个可以收到,并且再次启动,会下一个消费者收到,类似一个轮询的关系。

acknowledgment 消息不丢失(通过客户端设置实现)

通过no_ack = False参数设置,如果消费者遇到情况突然中断了没有收到,那么RabbitMQ会重新将任务添加到队列中

下面将接收端的代码进行更改:

#AUTHOR:FANimport pikaimport timeconnection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.25'))channel = connection.channel()channel.queue_declare(queue="hello")def callback(ch,method,properties,body):    print(ch,method,properties)    time.sleep(10)    print("received %s" %body)channel.basic_consume(callback,                      queue='hello',                      no_ack=False)print("waiting for messages to exit press 'CTRL+C'")channel.start_consuming()

标注的地方就是代码修改的地方,通过将no_ack更改为False,以及在callback回到函数这里让等待10s,这样启动接收端后,再启动发送算,在还没有打印数据的时候将客户端关闭,然后再启动,发现依然可以收到刚才发送端发送的数据。

但是这种方式只能实现客户端断开重新连接的时候数据不丢失,如果是rabbitmq挂了的情况如何解决?

durable消息不丢失(通过在服务端设置保证数据不丢失)

这个时候生产者和消费者的代码都需要改动

发送者代码

1 import pika 2  3  4 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.25')) 5 channel = connection.channel() 6  7 channel.queue_declare(queue='fan',durable=True) 8  9 channel.basic_publish(exchange='',10                       routing_key='fan',11                       body='hello world',12                       properties = pika.BasicProperties(13                           delivery_mode=214                       ))15 16 print("send 'hello world'")17 connection.close()

接收者的代码

1 import pika 2 import time 3  4 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.25')) 5 channel = connection.channel() 6  7 channel.queue_declare(queue='fan',durable=True) 8  9 def callback(ch,method,properies,body):10     print("received %s" %body)11     time.sleep(10)12     print("is ok")13     ch.basic_ack(delivery_tag=method.delivery_tag)14 15 channel.basic_consume(callback,16                       queue='fan',17                       no_ack=False)18 19 print("waitting for messages.To exit press CTRL+C")20 channel.start_consuming()

这样即使在接收者接收数据过程中rabbitmq服务器出现问题了,在服务恢复之后,依然可以收到数据

发布订阅

 

发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

通过exchange type = fanout参数实现

代码例子:

发布者:

1 #AUTHOR:FAN 2  3 import pika 4 import sys 5  6 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.8.103')) 7 channel = connection.channel() 8  9 channel.exchange_declare(exchange="fan",10                          type='fanout')11 12 message = ' '.join(sys.argv[1:]) or "info :hello world"13 channel.basic_publish(exchange = 'fan',14                       routing_key='',15                       body=message)16 17 print("send %s" %message)18 connection.close()

订阅者:

#AUTHOR:FANimport pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('192.168.8.103'))channel = connection.channel()channel.exchange_declare(exchange="fan",                         type='fanout')#随机生成队列名字result = channel.queue_declare(exclusive=True)queue_name = result.method.queue#将exchange和队列绑定channel.queue_bind(exchange='fan',                   queue=queue_name)print("waiting for fan ,To exit press CTRL+C")def callback(ch,method,proerties,body):    print("---",body)channel.basic_consume(callback,                      queue=queue_name,                      no_ack=True)channel.start_consuming()

关键字发送

 

 

 

通过参数:exchange type = direct实现

之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

代码例子如下:

消费者代码:

1 #AUTHOR:FAN 2 import pika 3 import sys 4 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.25')) 5 channel = connection.channel() 6 channel.exchange_declare(exchange='direct_logs_1', 7                          type='direct') 8 result = channel.queue_declare(exclusive=True) 9 queue_name = result.method.queue10 11 severities = sys.argv[1:]12 if not severities:13     sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])14     exit(1)15 print(severities)16 for severity in severities:17     print(severity)18     channel.queue_bind(exchange='direct_logs_1',19     queue=queue_name,20     routing_key=severity)21 print("waiting for logs,To exit press CTRL+C")22 def callback(ch,method,properties,body):23     print("%s:%s" %(method.routing_key,body))24 25 channel.basic_consume(callback,26                       queue=queue_name,27                       no_ack=True)28 channel.start_consuming()

生产者代码

1 import pika 2 import sys 3  4 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.25')) 5 channel = connection.channel() 6  7 channel.exchange_declare(exchange='direct_logs_1', 8                          type='direct') 9 10 print(sys.argv)11 severity = sys.argv[1] if len(sys.argv) >1 else "error"12 message = ' '.join(sys.argv[2:]) or 'hello world'13 channel.basic_publish(exchange='direct_logs_1',14                       routing_key = severity,15                       body = message)16 print("send %s:%s" %(severity,message))17 connection.close()

模糊匹配

 

通过参数exchange type = topic实现

在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

 

# 表示可以匹配 0 个 或 多个 单词

*  表示只能匹配 一个 单词

 

 

--------------------还没有整理完

转载地址:http://lygol.baihongyu.com/

你可能感兴趣的文章
vim
查看>>
MacOs 开发环境设置
查看>>
Mac os远程登录Linux与文件传输
查看>>
Java随机数使用注意事项
查看>>
AngularJs学习日记[3]:ng-init
查看>>
git 删除错误提交的commit
查看>>
java泛型中T、E、K、V、?等含义
查看>>
python 运行 MySQL-python libmysqlclient.so.18: cannot open shared object file: No such file
查看>>
视频播放器推荐
查看>>
[root@AY140716161543837722Z ~]# man top
查看>>
C语言基础及指针⑩预编译及jni.h分析
查看>>
java打开IE浏览器
查看>>
PHP中$this的使用情况
查看>>
webview页面随设备分辨率缩放
查看>>
调侃面向对象编程的23种设计模式
查看>>
8-pandas聚合运算
查看>>
【绿色系统】如何恢复XP“显示桌面”按钮
查看>>
在ubuntu 11.10 下安装谷歌输入法
查看>>
Apache Hive2.1.0安装笔记
查看>>
django中翻译处理国际化方法
查看>>