用redis实现openresty的缓存同步

jujusharp  •  Mar 30, 2019 2:12:43 PM

原文地址


用redis实现openresty的缓存同步

"一切单机缓存都是魔鬼,与其被消灭,不如与其共舞"

之前接到我们uAuth的一个bug,具体原因为,当一个用户改密后,原token理应失效,但是线上时常会有原token访问的正常的情况。 可是在测试环境上,确无论如何也复现不出来。

后来仔细分析了源码,是由于token的存储用了openresty的缓存,当token失效后,只在线上的n台服务器中的一台做了失效处理,而其他的n-1台的缓存仍然有效。

缓存不一致 ———— 这确实是好多场景容易碰到的问题,那么怎么办?解决方式有二:

  1. 干掉openresty的缓存,将存储设计由openresty缓存/redis/mysql 改为redis/mysql 两层结构。

  2. 设计openresty的缓存同步机制,从根儿上解决这个问题。

方式1,确实是简单直接有效的方式,but:

使用openresty,不让我用缓存,那和一条咸鱼有什么区别?

于是,我选择了第2种方式,那么问题来了,如何设计这个同步机制 ? 经典的同步肯定是发布/订阅来搞定,第一时间自然想到了kafka,可是查了一圈发现openresty中官方的resty.kafka只支持生产,并且大多数场景都是用来日志记录。到时redis,有一个经典的subscribe例子,那么好,就这么干。

封装发布/订阅操作

既然同步,那咱就整到位。 第一步,封装一个redis_message.lua

  1. local redis_c = require "resty.redis"

  2. local cjson = require 'cjson.safe'

  3. local M = {}

  4. local mt = {__index = M}

  5. function M:new(cfg)

  6. local ins = {

  7. timeout = cfg.timeout or 60000,

  8. pool = cfg.pool or {maxIdleTime = 120000,size = 200},

  9. database = cfg.database or 0,

  10. host = cfg .host,

  11. port = cfg. port,

  12. password = cfg .password or ""

  13. }

  14. setmetatable(ins,mt)

  15. return ins

  16. end

  17. local function get_con(cfg)

  18. local red = redis_c:new()

  19. red:set_timeout(cfg.timeout)

  20. local ok,err = red:connect(cfg.host,cfg.port)

  21. if not ok then

  22. return nil

  23. end

  24. local count ,err = red:get_reused_times()

  25. if 0 == count then

  26. ok ,err = red:auth(cfg.password)

  27. elseif err then

  28. return nil

  29. end

  30. red:select(cfg.database)

  31. return red

  32. end

  33. local function keep_alive(red,cfg)

  34. local ok,err = red:set_keepalive(cfg.pool.maxIdleTime,cfg.pool.size)

  35. if not ok then

  36. red:close()

  37. end

  38. return true

  39. end

  40. function M:subscribe(key,func)

  41. local co = coroutine.create(function()

  42. local red = get_con(self)

  43. local ok,err = red:subscribe(key)

  44. if not ok then

  45. return err

  46. end

  47. local flag = true

  48. while flag do

  49. local res,err = red:read_reply()

  50. if err then

  51. ;

  52. else

  53. if res[1] == "message" then

  54. local obj = cjson.decode(res[3])

  55. flag = func(obj.msg)

  56. end

  57. end

  58. red:set_keepalive(100,100)

  59. end

  60. end)

  61. coroutine.resume(co)

  62. end

  63. function M:publish(key,msg)

  64. local red = get_con(self)

  65. local obj = {}

  66. obj.type = type(msg)

  67. obj.msg = msg

  68. local ok,err = red:publish(key,cjson.encode(obj))

  69. if not ok then

  70. return false

  71. else

  72. return true

  73. end

  74. keep_alive(red,self)

  75. end

  76. return M

这个messagel.lua里,有几个点可以关注一下:

  1. redis的的subscribe操作是subcribe CHANNEL

  2. 当subscribe收到相应的信息后,是一个数组,依次为[事件,通道,数据],因此我们只从简单考虑事件的情况,即 message这种情况。

  3. 我们对外封装是subcribe后传入一个函数的,如果这个函数返回false就停止订阅。

设计消息格式

为了保证缓存操作的通用性,我们设计消息格式为:

  1. local msg = {key = key ,cache = cache_name,op=op,data=data,timeout=timeout}

在这其中:

  1. cache表示我们要同步的缓存名称。

  2. key表明了要同步的缓存key值

  3. op设置了三种,分别是set,expire,del

  4. 根据op的不同,可以选择性的传入data,timeout(设置超时使用)

加入一个配置,我这里保存的是app/config/cacheSync

  1. return {

  2. redis = {

  3. host="10.10.10.111",

  4. port=6379,

  5. database = 3,

  6. password = "Pa88word"

  7. },

  8. queueName='lua:tiny:cache:sync',

  9. }

封装业务层的发布操作

在封装好底层库,设计好消息格式后,我们就可以封装业务层的操作了。即:

  1. local cfg = require('app.config.cacheSync') --同步的配置,包括服务器

  2. local redis_message = require('libs.redis.redis_message') --上文封装的message

  3. local function async_cache(cache_name,key,op,data,timeout)

  4. local rm = redis_message:new(cfg.redis)

  5. local message = {key = key ,cache = cache_name,op=op}

  6. if data then

  7. message .data = data

  8. end

  9. if timeout then

  10. message.timeout = timeout

  11. end

  12. rm:publish(cfg.queueName,message)

  13. end

封装订阅操作

我们的订阅操作放在initbylua_file的生命周期中,话不多说代码如下:

  1. local cfg = require('app.config.cacheSync') --同步的配置,包括服务器

  2. local redis_message = require('libs.redis.redis_message') --上文封装的message

  3. local cjson = require('cjson.safe')

  4. local function handler(msg)

  5. local key = msg.key

  6. local shared = ngx.shared[msg.cache]

  7. if shared ~= nil and shared ~= ngx.null then

  8. if msg.op == 'del' then

  9. shared:delete(key)

  10. elseif msg.op == 'set' then

  11. local data = cjson.encode(msg.data)

  12. if data == nil or data == ngx.null then

  13. data = msg.data

  14. end

  15. local res ,msg = shared:set(key,data)

  16. if msg then

  17. ngx.log(ngx.ERR,msg)

  18. end

  19. elseif msg.op =='expire' then

  20. shared:set(key,cjson.encode(msg.data),msg.timeout)

  21. end

  22. end

  23. return true

  24. end

  25. local req_id = ngx.worker.id()

  26. if req_id == 0 then

  27. ngx.timer.at(0,function()

  28. local message = redis_message:new(cb_cfg.redis)

  29. message:subscribe(cfg.queueName,handler)

  30. end)

  31. end

OK,至此为止,我们成功的实现了openresty的缓存同步,并且初步有了一套可复用的组件,当然这里还有几个tips:

  1. 需要在conf文件中设置 lua_socket_log_errors off; 否则nginx的error文件会一直报timeout错误。

  2. 没有考虑redis的可靠性问题。

关于老拐瘦

散养程序猿,野生架构狮
二流搬砖工,三流摄影师
假正经真逗比,装文艺实二逼
所以,这么一个公众号里

有代码,有段子

有美图,有鸡汤

反正,乱七八遭的,没准碰上哪个刚好就烦到您了呢

啥也不说,扫码关注吧

0 回复
暂时没有回复,你也许会成为第一个哦