一、基本介绍


  • MQ库实现了各类消息代理中间件(Message Broker)的连接协议, 目前支持:redismqttstomp协议.

  • MQ库基于上述协议实现了: 生产者 -> 消费者与订阅 -> 发布模型, 可以在不依赖其它服务的情况下独立完成任务.

  • MQ库可以作为消费者与生产者来解决WebApp实例通信与订阅/发布的工作解耦, 完成多进程与跨实例通信的工作.

二、API学习


MQ库中有三种对象可供使用者自行导入. 具体的导入方法为:

  • 基于redis协议实现的消息队列: local MQ = require "MQ.redis"

  • 基于mqtt协议实现的消息队列: local MQ = require "MQ.mqtt"

  • 基于stomp协议实现的消息队列: local MQ = require "MQ.stomp"

1. MQ:new


函数原型: MQ:new(opt)

opt是一个table类型的参数, 里面可以包含如下内容:

  • host - String类型, 消息队列的域名或者IP地址.

  • port - Interge类型, 消息队列监听的端口.

  • auth - String类型, 如果后端MQ是Redis, 配置了连接口令则需要配置(可选);

  • db - Integer类型, 如果后端MQ是Redis, db范围在(0-16)(可选);

  • username - String类型, 仅在stomp/mqtt协议下用作登录认证(没有可以不填写).

  • password - String类型, 仅在stomp/mqtt协议下用作登录认证(没有可以不填写).

  • vhost - String类型, 仅在使用某些特定消息队列server的时候填写(例如:rabbit).

  • keepalive - Integer类型, 仅在使用mqtt的时候用来出发客户端主动发出心跳包的时间.

    此方法将会创建一个的MQ对象实例.

2. MQ:on


函数原型: MQ:on(pattern, callback)

此方法用来订阅一个指定pattern. 当接收到来自broker的推送消息后, callback将会被调用.

callback会被注入一个table类型的参数, 此参数将在断开连接的时候为nil(可用来判断服务器是否断开).

msg参数会根据采用的协议的不同, 其内部包含的属性也有所不同.

3. MQ:emit


函数原型: MQ:emit(pattern, msg)

此方法用来向指定pattern发送消息, msgString类型.

单个MQ可以一直复用emit, 内部会创建一个写入队列去完成消息的顺序发送.

4. MQ:clsoe


函数原型: MQ:close()

此方法可以关闭不再使用的MQ; 在任何情况下, MQ对象使用完毕后都需要调用此方法来释放资源.

5. MQ:start


函数原型: MQ:start()

此方法在作为独立运行服务端时候调用.

三、使用示例


1. 通用消息队列


local MQ = require "MQ.stomp"
-- local MQ = require "MQ.redis"
-- local MQ = require "MQ.mqtt"

local cf = require "cf"
require "utils"

local mq = MQ:new { host = 'localhost', port = 6379 }

mq:on('/test', function (msg)
    print("收到来自/test的消息.")
    var_dump(msg)
end)

mq:on('/admin', function (msg)
    print("收到来自/admin的消息.")
    var_dump(msg)
end)

cf.at(0.1, function (args)
    print(mq:emit('/test', '{"code":'..math.random(1, 100)..',"from":"/test"}'))
    print(mq:emit('/admin', '{"code":'..math.random(1, 100)..',"from":"/admin"}'))
end)

mq:start()

2. Websocket订阅


local class = require "class"
local mq = require "MQ.redis"
local cf = require "cf"
local json = require "json"
local websocket = class("websocket")

function websocket:ctor(opt)
    self.ws = opt.ws             -- websocket对象
    self.send_masked = false     -- 掩码(默认为false, 不建议修改或者使用)
    self.max_payload_len = 65535 -- 最大有效载荷长度(默认为65535, 不建议修改或者使用)
    self.timeout = 15            -- 默认为一直等待, 非number类型会导致异常.
    self.count = 0
    self.mq = mq:new {
      host = "localhost",
      port = 6479,
      -- auth = 'admin',
      -- db = 0,
    }
end

function websocket:on_open()
    print('on_open')
    self.timer = cf.at(0.01, function ( ... ) -- 定时器
        self.count = self.count + 1
        self.ws:send(tostring(self.count))
    end)
    self.mq:on('/chat', function(msg)
      if not msg then
        return
      end
      self.ws:send(msg.payload)
    end)
end

function websocket:on_message(data, typ)
    print('on_message', self.ws, data)
    self.ws:send('welcome')
    -- self.ws:close(data)
end

function websocket:on_error(error)
    print('on_error', self.ws, error)
end

function websocket:on_close(data)
    print('on_close', self.ws, data)
    if self.timer then -- 清理定时器
      print("清理定时器")
      self.timer:stop()
      self.timer = nil
    end
    if self.mq then
      self.mq:close()
    end
end

return websocket

四、协议兼容性测试


  • mqtt - 在emqx 3.x上测试通过.

  • redis - 在redis 3.x以上版本测试通过.

  • stomp - 在rabbitmq 3.x上测试通过.

五、使用注意


MQ库实现的消息队列的设计是基于1 -> N的, 这可能会非常适合某些基于组播广播的应用业务场景.

MQ内部的实现是基于任务串行化思维的设计的, 所以每个订阅在回调函数的返回才意味着当前任务处理完毕.

简单来说: 我们的A订阅B订阅是可以被并发调用的. 但当前任务未处理完之前, 各自都不会处理后续的其它任务.

Copyright © CandyMi 2019-2022 all right reserved,powered by Gitbook该文件修订时间: 2021-03-26 23:50:39

results matching ""

    No results matching ""