这里是文章模块栏目内容页
基于Node.js和RabbitMQ搭建消息队列

一.简介

消息队列
消息队列(Message Queue,简称MQ),本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。其主要用途:不同进程Process/线程Thread之间通信。使用消息队列大概有以下原因:

  • 不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;

  • 不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;

不管到底是什么原因催生了消息队列,总之,上面两个猜测是其实际应用的典型场景。综上:

  • 消息队列中的“消息”即指同一台计算机的进程间,或不同计算机的进程间传送的数据。

  • 消息队列是在消息的传输过程中保存消息的容器。

  • 消息被发送到队列中,消息队列充当中间人,将消息从它的源中继到它的目标。消息队列可以保证在高并发状态下数据入库的顺序性和准确性。

RabbitMq
RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种。是应用比较广泛和稳定的成熟的消息队列中间件。由于RabbitMQ是基于Erlang开发的,所以天生具有分布式优点。

消息队列应用场景
关于消息队列的应用场景,可以先看这篇文章。这里我们主要模拟一个秒杀的场景。假定我们有100个商品的秒杀活动,现在我们需要保证前1000个发起请求的人能够顺利的记录进入数据库中。这个需求对于http方案是无法顺利完成任务的,下面就需要消息队列来完成。首先我们利用node.js和rabbitMQ搭建服务器,然后利用Siege模拟一个高并发的API请求。看看在高并发请求下http方式和消息队列方式的差异性和准确性。

二.应用

接下来我们就开始进行编码环节。首先我们需要先安装RabbitMQ,这里我们用之前说过的Docker来安装。执行如下命令。我的系统是Mac OS(懒得买服务器就在自己的机器上测试了),如果是linux的话会更好。

sudo docker pull rabbitmq 
#如果rabbitmq镜像下载失败,可以尝试下载rabbitmq:management版本
或者 sudo docker pull rabbitmq:management
#然后用docker启动rabbitmq
sudo docker run -d -e RABBITMQ_NODENAME=my-rabbit --name some-rabbit -p 5672:5672 rabbitmq:management

rabbit服务默认会启动在5672端口,我们把他映射到宿主主机的5672端口。
然后我们需要安装amqplib来在node中连接rabbitmq。我们创建好node的工作目录,然后创建server.js。我们先来看看最简单的消息队列,也就是客户端通过队列把消息传到服务端。在你的工程目录下运行如下命令。

npm install amqplib

打开server.js,我们完成服务端的代码。在这里我们用es5的promise来完成对回调函数的处理。如果不了解promise的可以先去看<a href="http://es6.ruanyifeng.com">ES6详解。</a>

/**
 * Created by wsd on 17/2/23.
 */var amqp = require('amqplib');//首先我们需要通过amqp连接本地的rabbitmq服务,返回一个promise对象amqp.connect('amqp://127.0.0.1').then(function(conn){//进程检测到终端输入CTRL+C退出新号时,关闭RabbitMQ队列。
  process.once('SIGN',function(){
    conn.close();
  });//连接成功后创建通道
  return conn.createChannel().then(function(ch){//通道创建成功后我们通过通道对象的assertQueue方法来监听hello队列,并设置durable持久化为false。这里消息将会被保存在内存中。该方法会返回一个promise对象。
    var ok = ch.assertQueue('hello',{durable:false}).then(function(_qok){//监听创建成功后,我们使用ch.consume创建一个消费者。指定消费hello队列和处理函数,在这里我们简单打印一句话。设置noAck为true表示不对消费结果做出回应。//ch.consume会返回一个promise,这里我们把这个promise赋给ok。
      return ch.consume('hello',function(msg){
        console.log("[x] Received '%s'",msg.content.toString());
      },{noAck:true});
    });//消费者监听完成之后,打印一行成功信息
    return ok.then(function(_consumeOk){
      console.log('[*] Waiting for message. To exit press CRTL+C');
    });
  });}).then(null,console.warn);//如果报错打印报错信息

以上就是服务端的相关代码,下面我们来看客户端。创建client.js。我们还需要安装when来运行promise。运行npm install when。

/**
 * Created by wsd on 17/2/23.
 */var amqp = require('amqplib');var when = require('when');//连接本地消息队列服务amqp.connect('amqp://localhost').then(function(conn){//创建通道,让when立即执行promise
  return when(conn.createChannel().then(function(ch){
    var q = 'hello';
    var msg = 'Hello World';
  //监听q队列,设置持久化为false。
    return ch.assertQueue(q,{durable: false}).then(function(_qok){
  //监听成功后向队列发送消息,这里我们就简单发送一个字符串。发送完毕后关闭通道。
      ch.sendToQueue(q,new Buffer(msg));
      console.log(" [x] Sent '%s'",msg);
      return ch.close()
    });
  })).ensure(function(){ //ensure是promise.finally的别名,不管promise的状态如何都会执行的函数//这里我们把连接关闭
    conn.close();
  });}).then(null,console.warn);

接下来我们启动服务和客户端。

node server.js#[*] Waiting for message. To exit press CRTL+C
node client.js#[x] Sent 'Hello World'

然后我们切换到服务端

#[*] Waiting for message. To exit press CRTL+C#[*] Received 'Hello World'!

至此一个最简单的消息队列搭建完成。下面我们来模拟文章一开始所说的秒杀的场景。我们会基于Http和RabbitMQ两种实现形式做对比。
秒杀活动场景 http模拟
首先我们编写服务模拟前端向server发起请求,这里我们采用koa框架来实现。新建http_web_server.js。

/**
 * Created by wsd on 17/2/23.
 */var koa = require('koa');//一个工具类var util = require('util');var route = require('koa-route');var request = require('request');//这个用于作为用户idvar globalUserId = 1;var app = koa()//用于判断服务是否启动app.use(route.get('/',function *(){
  this.body = 'Hello world';}))//定义请求到后端的URL地址,这里为了方便我就在本机上测试,大家如果有远程服务器的话可以在远程服务器上测试var uri = 'http://127.0.0.1:8000/buy?userid=%d';var timeout = 30 * 1000;//超时30s//设置路由app.use(route.get('/buy',function *(){//用户id简单地每次请求递增1
  var num = globalUserId ++;//调用request发起请求
  request({
    method:'GET',
    timeout:timeout,
    uri:util.format(uri,num)
  },function(error,req_res,body){
    if(error){
      this.status = 500
      this.error = error    }else if(req_res.status != 200){
      this.status = 500
    }else{
      this.body = body    }
  })}))app.listen(5000,function(){
  console.log('server listen on 5000');})

首先我们安装koa,util,koa-route,request四个模块。然后我们模拟向最终入库的server发送生成订单请求。接下来我们完成入库server的相关代码。由于我们需要对数据库操作,所以需要安装mongodb和mongoose模块。

#安装mongodb
brew intall mongodb
#启动mongodb,设置数据的存储路径
mongod --dbpath data/db --logappend
#安装mongoose
npm install mongoose

然后我们首先创建数据库Model文件orderModel.js。

/**
 * Created by hwh on 17/2/23.
 */var mongoose = require('mongoose');//连接到本地开启的mongodb,mongodb默认监听27017端口var connstr = 'mongodb://127.0.0.1:27017/http_vs_rabbit';//设置数据库连接池大小var poolsize = 50;mongoose.connect(connstr,{server:{poolSize:poolsize}})var Schema = mongoose.Schema;var obj = {
  userId:{type:Number, required:true},
  writeTime:{type: Date,default: Date.now()}}var objSchema = new Schema(obj);module.exports = mongoose.model('orders',objSchema);

然后我们创建数据库操作文件orderLib.js。

/**
 * Created by hwh on 17/2/24.
 */var objModel = require('./orderModel.js');//针对generator的存取操作exports.countAll = function(obj){//获得订单总数
  return objModel.count()}exports.insertOneByObj = function(obj){//创建订单
  return objModel.create(obj);}//针对非generator的存取操作exports.countAllNormal = function(obj,cb){
  return objModel.count(obj || {},cb)}exports.insertOneByObjNormal = function(obj,cb){
  return objModel.create(obj || {},cb)}

最后我们创建http_back.js来接收数据并入库。

/**
 * Created by hwh on 17/2/23.
 */var koa = require('koa');var route = require('koa-route');var bodyparser = require('koa-bodyparser');var app = koa();var orderModel = require('./orderModellib.js');var listenPort = 3000;app.use(bodyparser())app.use(route.get('/',function * (){
  this.body = "hello world,listenPort:" + listenPort}));app.use(route.get('/buy',function * (){//拿到参数
  var userid = this.request.query.userid;//获取数据库中订单数量
  var count = yield orderModel.countAll();//做判断,大于100就不再入库
  if (count > 100){
    this.body = 'sold out!';
  }else{
    var model = yield orderModel.insertOneByObj({
      userId:userid    });
    if(model){
      this.body = 'success';
    }
  }}));app.listen(listenPort,function(){
  console.log('Server listening on:',3000);})

这里由于我们需要对body进行解析,所以我们安装koa-bodyparser模块。代码比较简单。接下来我们安装ngnix设置反向代理。

#安装nginx
brew install nginx
#进入nginx目录
cd /usr/local/etc/nginx
#修改配置文件
vi nginx.conf

我们主要设置反响代理相关配置。

#user  wsd;#开启两个nginx进程,等于cpu核心数或者cpu*2worker_processes  2;#error_log  logs/error.log;#error_log  logs/error.log  notice;#error_log  logs/error.log  info;#pid        logs/nginx.pid;events {
    #事件模型,由于是mac系统使用kqueue;linux使用epoll。简单说明一下两种事件模型使用场景。Kqueue和Epoll都属于高效事件模型。
    #Kqueue:使用于FreeBSD 4.1+, OpenBSD 2.9+, NetBSD 2.0 和 MacOS       X.  使用双处理器的MacOS X系统使用kqueue可能会造成内核崩溃。
    #Epoll:使用于Linux内核2.6版本及以后的系统。
    use kqueue;
    #单个工作进程的最大连接数,和硬件配置有关系。
    #尽量大但别超过CPU占用率的90%,这里我们为了测试取值比较小。理论上每台nginx服务器的最大连接数为worker_processes*worker_connections
    worker_connections  2048;}#设定http服务器,利用它的反向代理功能提供负载均衡支持http {
  #设置请求数据格式,这里就使用mime支持的类型
    include       mime.types;
  #http content_type
    default_type  application/octet-stream;
  #暂不储存日志(储存日志需要先使用log_format指令设置日志格式)
    access_log off;

    #log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
    #                  '$status $body_bytes_sent "$http_referer" '
    #                  '"$http_user_agent" "$http_x_forwarded_for"';

    #access_log  logs/access.log  main;

    #指定 nginx 是否调用sendfile 函数(zero copy 方式)来输出文件,对于普通应用,必须设为on。如果用来进行下载等应用磁盘IO重负载应用,可设置为off,以平衡磁盘与网络IO处理速度,降低系统uptime
    sendfile        on;

    #tcp_nopush     on;

    #keepalive_timeout  0;

    #设置超时时间
    keepalive_timeout  65;

    #定义负载均衡设备的Ip及设备状态
    upstream backend {
      server 127.0.0.1:3000;
    }
    #gzip  on;#配置ngnix启动的地址
    server {
        listen       8000;
        server_name  localhost;#这里nginx启动在本机8000端口        #charset koi8-r;

        #access_log  logs/host.access.log  main;#匹配所有路径location / {
            proxy_pass http://backend;#设置负载均衡的地址,这里设置为为backend里面的地址
            
            proxy_redirect default;#设置返回客户端请求头的location的值,默认不设置
            proxy_http_version 1.1;#代理的http协议版本
            root   html;
            index  index.html index.htm;
        }

        #error_page  404              /404.html;

        # redirect server error pages to the static page /50x.html
        #
        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
            root   html;
        }
    }
    include servers/*;
}

写好配置文件以后我们保存然后启动nginx。

#启动nginx
nginx

我们访问http://localhost:8000看看反向代理是否运行正常。如果正常的话会输出server listen on 3000。

http测试
接下来我们就要对这个http服务进行压力测试了。这里我们使用siege。

#安装wget
brew install wget
#下载siege
wget http://download.joedog.org/siege/siege-latest.tar.gz
#解压
tar -zxvf siege-latest.tar.gz
#进入siege脚本目录
cd siege-4.0.2
#配置
./configure
#编译并安装
make && make install

可以输入siege -help查看siege支持的命令。这里我们主要用到-c(指定并发数)和-r(指定测试次数)。现在我们分别模拟100、200、300个并发,并循环发送10次。当然别忘记每次我们请求完毕后在下一次请求开始前要把数据库清空。

siege -c 100 -r 10 -q http://192.168.1.150:5000/buy
siege -c 200 -r 10 -q http://192.168.1.150:5000/buy
siege -c 300 -r 10 -q http://192.168.1.150:5000/buy

每完成一次并发操作,我们使用mongo命令连接到本地的mongodb服务。并且查看数据库里面订单的数量。

#连接数据库
mongo
#选择数据库
use http_vs_rabbit
#查看集合数量
 db.orders.count()

下面是每一次并发操作后,数据库中订单的数量

//100次并发114//200次并发125//300次并发119

可以看到,每一次并发订单数量都会超出预订值。下面是一些参数:

Date & Time            Trans    Elap Time     Data Trans  Resp Time  Trans Rate    Throughput    Concurrent  OKAY   Failed
2017-03-08 15:09:47,   1000,       3.62,           0,       0.01,      276.24,        0.00,        1.49,       0,       0
2017-03-08 15:10:48,   2000,       3.96,           0,       0.01,      505.05,        0.00,        3.10,       0,       0
2017-03-08 15:11:27,   3000,       4.19,           0,       0.01,      715.99,        0.00,        8.64,       0,       0

看一下上述各参数的意思。

  • Date & Time:请求时间

  • Trans:请求总数

  • Elap Time:测试用时

  • Data Trans:测试传输数据量

  • Resp Time:平均响应时间

  • Trans Rate:每秒事务处理量

  • Throughput:吞吐率

  • Concurrent:并发用户数

  • OKAY:成功数

  • Failed:失败数

可以看到在并发测试中,http处理事务的速度虽然不错,但并不能保证结果的准确和可靠。下面我们来看一下利用rabbitmq的测试结果。首先我们也是像http一样,写一个服务模拟前端请求。这里我们新建rabbit_web_server.js。

/**
 * Created by wsd on 17/2/24.
 */var koa = require('koa');var router = require('koa-route');var amqp = require('amqplib');var uuid = require('node-uuid');var app = koa();var correlationId = uuid();var q = 'fibq';//前端发送消息队列var q2 = 'ackq';//后台回复队列//conn写成全局变量,循环利用。否则每次访问路由都会创建connvar conn;//依然id每次请求递增1var globalUserId = 1;app.use(router.get('/',function * (){
  this.body = 'hello world';}));app.use(router.get('/buy/',function*(){
  var num = globalUserId ++;
  //conn我们在外部创建,并且只创建一次(复用)
  conn.createChannel().then(function(ch){
    //监听q2队列(订单量如果到达100,服务端会通过q2队列返回信息)
    return ok = ch.assertQueue(q2,{durable:false}).then(function(){
      //创建消费q2队列,这里简单把信息设置到res的body里
      ch.consume(q2,function(msg){
        console.log(msg.content.toString());
        this.body = msg.content.toString();
        ch.close();
      },{noAck:true});
      //发送消息到q队列,这里把订单id作为content。把q2队列的name和uuid也传过去,这里uuid用来做消息的关联id
      ch.sendToQueue(q,new Buffer(num.toString()),{replyTo:q2,correlationId:correlationId})
    });
  }).then(null,console.error);}));amqp.connect('amqp://127.0.0.1').then(function(_conn){
  conn = _conn;});app.listen(5001,function(){
  console.log('server listen on 5001');});

下面我们在新建rabbit_mq_server.js文件,来写入库的操作。

/**
 * Created by hwh on 17/2/24.
 */var amqp = require('amqplib');var co = require('co');var orderModel = require('./orderModellib');var q = 'fibq';amqp.connect('amqp://127.0.0.1').then(function(conn){
  process.once('SIGN',function(){
    conn.close();
  });
  return conn.createChannel().then(function(ch){//设置公平调度,这里是指rabbitmq不会向一个繁忙的队列推送超过1条消息。
    ch.prefetch(1);
    //定义回传消息函数
    var ackSend = function(msg,content){
      //要注意这里我们之前传上来的队列名和uuid会被保存在msg对象的properties中
      //因为服务端并不知道回传的队列名字,所以我们需要把它带过来
      ch.sendToQueue(msg.properties.replyTo,new Buffer(content.toString()),
          {correlationId:msg.properties.correlationId});
      //ack表示消息确认机制。这里我们告诉rabbitmq消息接收成功。
      ch.ack(msg);
    }
    //定义收到消息的处理函数
    var reply = function (msg){
      var userid = parseInt(msg.content.toString());
      //这里由于consume的处理函数不支持generator语法,这里我们就用es5的方式访问数据库、
      orderModel.countAllNormal({},function(err,count){
        if(count >= 100){
          return ackSend(msg,'sold out!');
        }else{
          orderModel.insertOneByObjNormal({
            userId:userid          },function(err,model){
            return ackSend(msg,"buy success,orderid:"+model._id.toString())
          });
        }
      });
    };
    //监听队列q并消费
    var ok = ch.assertQueue(q,{durable:false}).then(function(){
      ch.consume(q,reply,{noAck:false});
    });
    return ok.then(function(){
      console.log(' [*] waiting for message')
    })
  })}).then(null,console.error);

分别启动rabbit_web_server.js和rabbit_mq_server.js。接下来我们还是像之前测试http服务一样,用siege模拟100、200、300次并发。要注意这里我们的服务变成了5001端口。
可以看到不管多少并发数下,我们数据库里的订单都是100。这保证了我们数据的准确性。下面是siege记录的参数:

Date & Time            Trans    Elap Time     Data Trans  Resp Time  Trans Rate    Throughput    Concurrent  OKAY   Failed
2017-03-08 16:36:06,   1000,       3.81,           0,       0.01,      262.47,        0.00,        2.17,       0,       0
2017-03-08 16:40:50,   2000,       4.15,           0,       0.02,      481.93,        0.00,        9.16,       0,       0
2017-03-08 16:41:03,   3000,       4.47,           0,       0.08,      671.14,        0.00,       51.91,       0,       0

对比http,对于高并发的操作,确实队列在耗时,每秒事务处理量和响应时间上会比http略逊一筹。由于node.js的异步I/O,所以http会存在插入超量的情况。因为很有可能你在异步往数据库里面插入数据还没有完成的时候,下一个请求已经过来了。但队列保证了结果的准确性,这在秒杀场景以及一些特殊场景是硬性要求。这是一个很常见的场景,因此掌握消息队列的操作是作为服务端开发来说必不可少的。

在这里,为了提升rabbitmq的性能,我们可以开启多个rabbitmq进程。这个就交给大家下去测试吧。

rabbitmq还有以下几种应用场景:

  • 一个生产者多个消费者

    • 轮询


      Paste_Image.png

    • 广播(faout)


      Paste_Image.png

    • 路由(direct)

Paste_Image.png

  • RPC远程调用

Paste_Image.png

  • 跨平台通信(比如node和python、java)

这些会在我后面的文章中讲解,敬请期待。

48人点赞

日记本



作者:sidiWang
链接:https://www.jianshu.com/p/a4d92d0d7e19
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。


更多栏目
相关内容