关于redis:Redis-实战-09-实现任务队列消息拉取和文件分发

4次阅读

共计 4591 个字符,预计需要花费 12 分钟才能阅读完成。

工作队列 P133

通过将待执行工作的相干信息放入队列外面,并在之后对队列进行解决,能够推延执行那些耗时对操作,这种将工作交给工作处理器来执行对做法被称为工作队列 (task queue)。P133

先进先出队列 P133

能够 Redis 的列表构造存储工作的相干信息,并应用 RPUSH 将待执行工作的相干信息推入列表右端,应用阻塞版本的弹出命令 BLPOP 从队列中弹出待执行工作的相干信息(因为工作处理器除了执行工作不须要执行其余工作)。P134

发送工作

// 将工作参数推入指定工作对应的列表右端
func SendTask(conn redis.Conn, queueName string, param string) (bool, error) {count, err := redis.Int(conn.Do("RPUSH", queueName, param))
    if err != nil {return false, nil}
    // 只有胜利推入 1 个才算胜利发送
    return count == 1, nil
}

执行工作

// 一直从工作对应的列表中获取工作参数,并执行工作
func RunTask(conn redis.Conn, queueName string, taskHandler func(param string)) {
    for ; ; {result, err := redis.Strings(conn.Do("BLPOP", queueName, 10))
        // 如果胜利获取工作信息,则执行工作
        if err != nil && len(result) == 2 {taskHandler(result[1])
        }
    }
}

以上代码是工作队列与 Redis 交互的通用版本,应用形式简略,只须要将入参信息序列化成字符串传入即可发送一个工作,提供一个解决工作的办法回调即可执行工作。

工作优先级 P136

在此基础上能够讲原有的先进先出工作队列改为具备优先级的工作队列,即高优先级的工作须要在低优先级的工作之前执行。BLPOP 将弹出第一个非空列表的第一个元素,所以咱们只须要将所有工作队列名数组依照优先级降序排序,让工作队列名数组作为 BLPOP 的入参即可实现上述性能(当然这种如果高优先级工作的生成速率大于生产速率,那么低优先级的工作就永远不会执行)。P136

优先执行高优先级工作

// 一直从工作对应的列表中获取工作参数,并执行工作
// queueNames 从前往后的优先级顺次升高
func RunTasks(conn redis.Conn, queueNames []string, queueNameToTaskHandler map[string]func(param string)) {
    // 校验是否所有工作都有对应的解决办法
    for _, queueName := range queueNames {if _, exists := queueNameToTaskHandler[queueName]; !exists {panic(fmt.Sprintf("queueName(%v) not in queueNameToTaskHandler", queueName))
        }
    }
    // 将所有入参放入同一个数组
    length := len(queueNames)
    args := make([]interface{}, length + 1)
    for i := 0; i < length; i++ {args[i] = queueNames[i]
    }
    args[length] = 10
    for ; ; {result, err := redis.Strings(conn.Do("BLPOP", args...))
        // 如果胜利获取工作信息,则执行工作
        if err != nil && len(result) == 2 {
            // 找到对应的解决办法并执行
            taskHandler := queueNameToTaskHandler[result[0]]
            taskHandler(result[1])
        }
    }
}
提早工作 P136

理论业务场景中还存在某些工作须要在指定工夫进行操作,例如:邮件定时发送等。此时还须要存储工作执行的工夫,并将能够执行的工作放入刚刚的工作队列中。能够应用有序汇合进行存储,工夫戳作为分值,工作相干信息及队列名等信息的 json 串作为键。

发送提早工作

// 存储提早工作的相干信息,用于序列化和反序列化
type delayedTaskInfo struct {
    UnixNano  int64  `json:"unixNano"`
    QueueName string `json:"queueName"`
    Param     string `json:"param"`
}
// 发送一个提早工作
func SendDelayedTask(conn redis.Conn, queueName string, param string, executeAt time.Time) (bool, error) {
    // 如果已到执行工夫,则间接发送到工作队列
    if executeAt.UnixNano() <= time.Now().UnixNano() {return SendTask(conn, queueName, param)
    }
    // 还未到执行工夫,须要放入有序汇合
    // 序列化相干信息
    infoJson, err := json.Marshal(delayedTaskInfo{UnixNano: time.Now().UnixNano(),
        QueueName:queueName,
        Param:param,
    })
    if err != nil {return false, err}
    // 放入有序汇合
    count, err := redis.Int(conn.Do("ZADD", "delayed_tasks", infoJson, executeAt.UnixNano()))
    if err != nil {return false, err}
    // 只有胜利退出 1 个才算胜利
    return count == 1, nil
}

拉取可执行的提早工作,放入工作队列

// 轮询提早工作,将可执行的工作放入工作队列
func PollDelayedTask(conn redis.Conn) {
    for ; ; {
        // 获取最早须要执行的工作
        infoMap, err := redis.StringMap(conn.Do("ZRANGE", "delayed_tasks", 0, 0, "WITHSCORES"))
        if err != nil || len(infoMap) != 1 {
            // 睡 1ms 再持续
            time.Sleep(time.Millisecond)
            continue
        }
        for infoJson, unixNano := range infoMap {
            // 已到工夫,放入工作队列
            executeAt, err := strconv.Atoi(unixNano)
            if err != nil {log.Errorf("#PollDelayedTask -> convert unixNano to int error, infoJson: %v, unixNano: %v", infoJson, unixNano)
                // 做一些后续解决,例如:删除该条信息,避免耽搁其余提早工作
            }
            if int64(executeAt) <= time.Now().UnixNano() {
                // 反序列化
                info := new(delayedTaskInfo)
                err := json.Unmarshal([]byte(infoJson), info)
                if err != nil {log.Errorf("#PollDelayedTask -> infoJson unmarshal error, infoJson: %v, unixNano: %v", infoJson, unixNano)
                    // 做一些后续解决,例如:删除该条信息,避免耽搁其余提早工作
                }
                // 从有序汇合删除该信息,并放入工作队列
                count, err := redis.Int(conn.Do("ZREM", "delayed_tasks", infoJson))
                if err != nil && count == 1 {_, _ = SendTask(conn, info.QueueName, info.Param)
                }
            } else {
                // 未到工夫,睡 1ms 再持续
                time.Sleep(time.Millisecond)
            }
        }
    }
}

有序汇合不具备列表的阻塞弹出机制,所以程序须要一直循环,并尝试从队列中获取要被执行的工作,这一操作会增大网络和处理器的负载。能够通过在函数外面减少一个自适应办法 (adaptive method),让函数在一段时间内都没有发现可执行的工作时,主动缩短休眠工夫,或者依据下一个工作的执行工夫来决定休眠的时长,并将休眠时长的最大值限度为 100ms,从而确保工作能够被及时执行。P138

音讯拉取 P139

两个或多个客户端在相互发送和接管音讯的时候,通常会应用以下两种办法来传递信息:P139

  • 音讯推送 (push messaging):即由发送者来确保所有接受者曾经胜利接管到了音讯。Redis 内置了用于进行音讯推送的 PUBLISH 命令和 SUBSCRIBE 命令(05. Redis 其余命令简介 介绍了这两个命令的用法和缺点)
  • 音讯拉取 (pull messaging):即由接受者本人去获取存储的信息
单个接受者 P140

单个接受者时,只须要将发送的信息保留至每个接收者对应的列表中即可,应用 RPUSH 能够向执行接受者发送音讯,应用 LTRIM 能够移除列表中的前几个元素来获取收到的音讯。P140

多个接受者 P141

多个接受者的状况相似群组,即群组内的人发消息,其他人都能够收到。咱们能够应用以下几个数据结构存储所需数据,以便实现咱们的所需的性能:

  • STRING: 群组的音讯自增 id

    • INCR: 实现 id 自增并获取
  • ZSET: 存储该群组中的每一条信息,分值为以后群组内的音讯自增 id

    • ZRANGEBYSCORE: 取得未获取的音讯
  • ZSET: 存储该群组中每个人取得的最新一条音讯的 id,所有音讯均未获取时为 0

    • ZCARD: 获取群组人数
    • ZRANGE: 通过解决后,可实现哪些音讯胜利被哪些人接管了的性能
    • ZRANGE: 获取 id 最小数据,可实现删除被所有人获取过的音讯的性能
  • ZSET: 存储一个人所有群组取得的最新一条音讯的 id,来到群组时主动删除,退出群组时初始化为 0

    • ZCARD: 获取所在的群组个数
    • ZRANGE: 通过解决后,可实现批量拉取所有群组的未获取的音讯的性能

文件散发 P145

依据地理位置聚合用户数据 P146

当初领有每个 ip 每天进行流动的工夫和具体操作,现须要计算每天每个城市的人操作数量(相似于统计日活)。

原始数据非常微小,所以须要分批读入内存进行聚合统计,而聚合后的数据相对来说很小,所以齐全能够在内存中进行聚合统计,实现后再将后果写入 Redis 中,能够无效缩小程序与 Redis 服务的通信次数,缩短工作工夫。

日志散发及解决

当初有一台机器的本地日志须要交给多个日志处理器进行不同的剖析。

这种场景相似群组,所以咱们能够复用下面提到的反对多个接受者的音讯拉取组件。

本地机器:

  1. 将所有日志发送至群组,最初再发送一条完结音讯
  2. 期待所有日志处理器解决完(群组对应的实现标识 = 群组内的成员数 – 1)
  3. 清理本次发送的所有日志

日志处理器:

  1. 一直从群组中拉取音讯,并进入相干解决,直至拉取到完结音讯
  2. 对群组对应的实现标识进行 INCR,示意以后日志处理器已实现解决

本文首发于公众号:满赋诸机(点击查看原文)开源在 GitHub:reading-notes/redis-in-action

正文完
 0