关于升级:CS架构中-agent如何自升级以falconagent为例

24次阅读

共计 7985 个字符,预计需要花费 20 分钟才能阅读完成。

前言

在咱们日常运维 / 运维开发工作中各种零碎次要分为两大流派

  • 本文次要探讨下有 agent 侧一些注意事项

客户端服务端的 C / S 架构

长处

  • c/ s 架构相比于基于 ssh 的并发和吞吐量要高的多
  • 利用 agent 可做的事件很多以及更精准的管制

毛病

  • 性能更新须要降级 agent
  • agent 如果保活是个头疼的问题
  • 机器上 agent 过多如何治理又是个问题

agentless 架构

特点无侵入性 agent: 典型利用就是基于 ssh ansible

长处

  • 无 agent 不须要关怀保活和笼罩问题
  • 性能更新次要在 server 端实现

毛病

  • 基于 ssh 的性能 / 性能较差

经典 client 案例

配置管理 / 批量操作

  • Saltstack Minion
  • Puppet Agent

监控

  • prometheus 各种各样的 exporter: node_exporter
  • open-falcon falcon-agent
  • Zabbix Agent

C/ S 架构中 agent 侧注意事项

agent 资源耗费

代码该当简洁,防止过多资源耗费

agent 资源监控能够应用 prometheus 的 client_golang,默认会 export 过程的 cpu_user、fd、mem 等信息帮忙咱们定位资源耗费

# HELP process_cpu_seconds_total Total user and system CPU time spent in seconds.
# TYPE process_cpu_seconds_total counter
process_cpu_seconds_total 38913.32
# HELP process_max_fds Maximum number of open file descriptors.
# TYPE process_max_fds gauge
process_max_fds 6.815744e+06
# HELP process_open_fds Number of open file descriptors.
# TYPE process_open_fds gauge
process_open_fds 15
# HELP process_resident_memory_bytes Resident memory size in bytes.
# TYPE process_resident_memory_bytes gauge
process_resident_memory_bytes 1.4659584e+07
# HELP process_start_time_seconds Start time of the process since unix epoch in seconds.
# TYPE process_start_time_seconds gauge
process_start_time_seconds 1.59350253732e+09
# HELP process_virtual_memory_bytes Virtual memory size in bytes.
# TYPE process_virtual_memory_bytes gauge
process_virtual_memory_bytes 1.201352704e+09
# HELP process_virtual_memory_max_bytes Maximum amount of virtual memory available in bytes.
# TYPE process_virtual_memory_max_bytes gauge
process_virtual_memory_max_bytes -1

agent 如何降级治理

举例: 当初要降级 agent 版本 from v1.0 to v1.1

思路一: 应用管理工具治理

如 ansible-playbook 能够参考我之前的文章 应用 ansible-playbook 实现 dnsdist 疾速劫持工具

咱们能够应用上面 python 代码将跑 playbook 封装成一个办法,应用的时候只须要传入 ip 列表,yaml,和额定的变量 dict即可
咳咳: 这个计划典型问题就是受限于单个 ansible 性能问题(很多小伙伴都被折磨过吧),当然能够将大量的 ip 列表分片分发给多个 ansible-server 执行,再将后果 merge 一下

    t = PlaybookApi([ip], yaml_path, {"conf_dir": conf_dir, "bk_file_name": bk_file_name})
    t.run()
from collections import namedtuple

from ansible.parsing.dataloader import DataLoader
from ansible.vars import VariableManager
from ansible.inventory import Inventory
from ansible.utils.vars import load_extra_vars
from ansible.utils.vars import load_options_vars
from ansible.executor.playbook_executor import PlaybookExecutor
from ansible.plugins.callback import CallbackBase


class ResultsCollector(CallbackBase):
    def __init__(self, *args, **kwargs):
        super(ResultsCollector, self).__init__(*args, **kwargs)
        self.host_ok = {}
        self.host_unreachable = {}
        self.host_failed = {}

    def v2_runner_on_unreachable(self, result):
        self.host_unreachable[result._host.get_name()] = result

    def v2_runner_on_ok(self, result, *args, **kwargs):
        self.host_ok[result._host.get_name()] = result

    def v2_runner_on_failed(self, result, *args, **kwargs):
        self.host_failed[result._host.get_name()] = result


# class PlaybookApi(PlaybookExecutor):
class PlaybookApi(PlaybookExecutor):
    def __init__(self, host_list, yaml_path, extra_vars):
        self.host_list = host_list
        self.yaml_path = yaml_path
        # self.kcache_path = kcache_path

        self.callback = ResultsCollector()
        self.extra_vars = extra_vars
        self.IpmiPlay()
        super(PlaybookApi, self).__init__(playbooks=[self.yaml_path], inventory=self.inventory,
                                          variable_manager=self.variable_manager,
                                          loader=self.loader, options=self.options, passwords={})
        self._tqm._stdout_callback = self.callback

    def IpmiPlay(self):
        Options = namedtuple('Options',
                             ['listtags', 'listtasks', 'listhosts', 'syntax', 'connection', 'module_path', 'forks',
                              'remote_user', 'private_key_file', 'ssh_common_args', 'ssh_extra_args',
                              'sftp_extra_args', 'scp_extra_args', 'become',
                              'become_method',
                              'become_user',
                              'verbosity', 'check', 'extra_vars'])
        self.options = Options(listtags=False, listtasks=False, listhosts=False, syntax=False, connection='ssh',
                               module_path=None,
                               forks=10, remote_user='',
                               private_key_file=None,
                               ssh_common_args='',
                               ssh_extra_args='',
                               sftp_extra_args='',
                               scp_extra_args='',
                               become=True,
                               become_method='sudo',
                               become_user='root',
                               verbosity=3,
                               check=False,
                               extra_vars={})

        self.loader = DataLoader()

        # create the variable manager, which will be shared throughout
        # the code, ensuring a consistent view of global variables
        variable_manager = VariableManager()
        variable_manager.extra_vars = load_extra_vars(loader=self.loader, options=self.options)
        variable_manager.options_vars = load_options_vars(self.options)
        self.variable_manager = variable_manager
        # create the inventory, and filter it based on the subset specified (if any)
        self.inventory = Inventory(loader=self.loader, variable_manager=self.variable_manager, host_list=self.host_list)
        self.variable_manager.set_inventory(self.inventory)
        self.variable_manager.extra_vars = self.extra_vars

    def get_result(self):
        # print("calling in get_result")
        self.results_raw = {'success': {}, 'failed': {}, "unreachable": {}}
        for host, result in self.callback.host_ok.items():
            self.results_raw['success'][host] = result
        for host, result in self.callback.host_failed.items():
            self.results_raw['failed'][host] = result
        for host, result in self.callback.host_unreachable.items():
            self.results_raw['unreachable'][host] = result._result['msg']
        return self.results_raw


if __name__ == '__main__':
    h = ["127.0.0.1"]
    yaml = "systemd_stop.yaml"

    api = PlaybookApi(h, yaml, {"app": "falcon-judge"})
    api.run()
    res = api.get_result()
    for k, v in res.items():
        for kk, vv in v.items():
            print(kk, vv._result)

思路二: 代码中实现自降级

以 falcon-agent 代码为例,代码地址 https://github.com/ning1875/falcon-plus/tree/master/modules/agent 整体实现流程:
ps: 原谅我那蜘蛛爬的字吧

实现剖析

  • 机器上个别会选用一种 daemontools 作为服务托管工具: 如 supervisor 和 systemd,而 systemd 更广泛些
  • 在机器上跑的服务最小可由三个文件组成: 一个二进制可执行文件、一个配置文件、一个 service 文件
  • 所以服务的自降级就是这三个文件的降级
  • 文件降级完后如何重启服务呢: 以 systemd 为例只须要发送 term 信号给本身过程即可,即 kill 过程 pid

    pid := os.Getpid()
    thisPro, _ := os.FindProcess(pid)
    thisPro.Signal(os.Kill)
  • agent 如何治理版本: 在 const 中指定

    // changelog:
    // 3.1.3: code refactor
    // 3.1.4: bugfix ignore configuration
    // 5.0.0: 反对通过配置管制是否开启 /run 接口;收集 udp 流量数据;du 某个目录的大小
    // 5.1.0: 同步插件的时候不再应用 checksum 机制
    // 5.1.1: 修复往多个 transfer 发送数据的时候 crash 的问题
    // 5.1.2: ignore mount point when blocks=0
    // 6.0.0: agent 自降级, 新增一些监控项
    // 6.0.1: agent collect level
    // 6.0.2: 增加单核监控开关默认不关上, 单核监控 tag 变更为 core=core0x , 增加 mem.available.percent
    // 6.0.3: 减少 sys.uptime
    // 6.0.4: 修复 cpu.iowait>100 的 bug
    // 6.0.5: 增加过程采集监控,距离 30s
    // 6.0.6: 调整内置的采集 func 距离 disk io 相干和 tcp 10s-->30s,agent_version 整数代表以后版本, 去掉动静监控办法
    // 6.0.7: ntp 反对 chronyc,服务监控 rpc call 距离调整为一分钟
    // 6.0.8: 批改监控项抓取工夫距离,10s 只保留 cpu,解决断点问题
    // 6.0.9: 修复 dfa dfb 块设施采集, 修复不同版本 ss- s 的 bug
    // 6.1.0: 修复机器上主机名被改 case, 使 ip 转化为 nxx-xx-xx 的模式
    const (
        VERSION          = "6.1.0"
        COLLECT_INTERVAL = time.Second
        URL_CHECK_HEALTH = "url.check.health"
        NET_PORT_LISTEN  = "net.port.listen"
        DU_BS            = "du.bs"
        PROC_NUM         = "proc.num"
        UPTIME           = "sys.uptime"
    )
    
  • 服务端如何开启降级开关: 在 hbs http 接口开启
  • 客户端如何 check 要不要降级: 只需 check 版本是否统一和是否在降级过程中
  • 服务端如何做到并发管制: 只需查看 redis 降级队列在降级的数量和预设的阈值比照
  • 管理员如何发动降级: 只须要给 hbs 发动 http 申请关上降级开关

    curl -X POST   http://127.0.0.1:6031/agent/upgrade -d '{"wgeturl":"http://${your_cdn_addr}/file/open-falcon","version":"6.0.1","binfile_md5":"35ac8534c0b31237e844ef8ee2bb9b9e"}'
  • 管理员
  • 如何进步降级并发: 其实就是如何让下载并发更高,即采纳大带宽 nginx 或 cdn 或者给不同批次 agent 发送不同下载 cdn 地址

毛病

  • 整体实现还比拟毛糙没有笼罩灰度和回滚(只能再发上一个版本)
  • 过程中没有直观的降级进度展现(只能通过 hbs 接口获取 agent 版本做求和)

过程阐明:

http-req --->hbs ---> 开启降级开关 ---> 查看 agent 心跳信息中版本号,并查看以后 hbs 降级队列 ---> 发送降级指令给 agent ---> agent 通过 降级命令中的 url 地址和指标版本号下载新的二进制(会有备份和回滚逻辑)--->agent check 没有问题后获取本身的 pid 向本人发送 kill 信号 --->agent 退出而后会被 systemd 拉起打到降级的目标 ---> 新的心跳信息中版本 checkok 不会持续降级

降级举例说明:

1. falcon-agent 新加了采集指标,测试 OK 后在代码中打上新的版本号比方 6.0.0(现有是 6.0.1)2. 而后将新版 放到下载服务器的门路下  wget http://${your_cdn_addr}/file/open-falcon/bin_6.0.1
3. 而后向 hbs 发送降级的 http 申请(这里有个爱护机制: 只能在 hbs 本机发动)4. 而后通过 hbs 的 http 接口查问以后心跳上来的 agent 的版本查看降级进度,curl -s http://localhost:6031/agentversions |python -m "json.tool"
5. 同时须要连贯的 redis 集群察看 agent_upgrade_set 这个 set 的值,redis-cli -h ip -p port -c smembers agent_upgrade_set & scard agent_upgrade_set
6. 目前看并发 2000 能够把一台下载的 nginx 万兆网卡流量打满。1.24GB/s

## falcon-agent 自降级命令
curl -X POST   http://127.0.0.1:6031/agent/upgrade -d '{"wgeturl":"http://${your_cdn_addr}/file/open-falcon","version":"6.0.1","binfile_md5":"35ac8534c0b31237e844ef8ee2bb9b9e"}'

curl -X GET  http://127.0.0.1:6031/agent/upgrade/nowargs
{"msg":"success","data":{"type":0,"wgeturl":"http://${your_cdn_addr}/file/open-falcon","version":"6.0.1","binfile_md5":"35ac8534c0b31237e844ef8ee2bb9b9e","cfgfile_md5":""}}

curl http://127.0.0.1:6031/agentversions
{"msg":"success","data":{"n3-021-225":"6.0.1"}}

curl -X DELETE  http://127.0.0.1:6031/agent/upgrade
{"msg":"success","data":"勾销降级胜利"}

  uri:
    url: http://127.0.0.1:6031/agent/upgrade
    method: POST
    body: {"wgeturl":"http://${your_cdn_addr}/file/open-falcon","version":"6.0.2","binfile_md5":"f5c597f15e379a77d1e2ceeec7bd99a8"}
    status_code: 200
    body_format: json

正文完
 0