乐趣区

关于python3.x:fastapi-框架-实现接口与zabbix自定义监控

前情与试验目标 

背景:服务器早晨负载常常忽然暴涨,只有 node 服务,然而双机另一台无问题

解决办法:

通过 python fastapi 实现 load,pm2 信息,cup 等接口,自定义 zabbix 脚本监控报警

实现 astapi 实现 load,pm2 信息,cup 等接口


#!/usr/bin/python
# -*- coding:utf-8 -*-

import subprocess
import urllib.request

from fastapi import FastAPI
import platform
import socket,requests
from ansible2 import *
import ansible_runner
import os, sys, json, datetime, time
import urllib.request
from fastapi.responses import HTMLResponse

from fastapi import FastAPI
from starlette.requests import Request
from starlette.responses import Response
from fastapi import FastAPI, Form
from fastapi import Cookie
from starlette.templating import Jinja2Templates
from starlette.staticfiles import StaticFiles
from utils import sqlhelper
import pymysql



def ansible_linux_command(hosts1,cmd1):
    ansible3 = MyAnsiable2(inventory='/data/ansible/host/hosts', connection='smart')
    ansible3.run(hosts=hosts1, module="shell", args=cmd1)
    stdout_dict = json.loads(ansible3.get_result())
    print(stdout_dict, type(stdout_dict))
    print(stdout_dict['success'][hosts1]['stdout'])
    source_list = stdout_dict['success'][hosts1]['stdout'].split("\n")

    return source_list[0]





def ansible_load(hosts1):
    pid_listf = float(ansible_linux_command(hosts1, "uptime | awk {tprint} |tr -d','".format(tprint="'{print $11}'")))
    print(pid_listf,type(pid_listf))
    if pid_listf > 10.00:
        stdout_list2 = {"load": ansible_linux_command(hosts1, "uptime | awk {tprint} |tr -d','".format(tprint="'{print $11}'")),"pm2": ansible_linux_command(hosts1,'pm2 ls|tr "\n" ""'),"cpu": ansible_linux_command(hosts1, 'ps aux|grep -v PID|sort -rn -k +3|head|tr"\n"""')}
    else:
        stdout_list2 = {"load": ansible_linux_command(hosts1, "uptime | awk {tprint} |tr -d','".format(tprint="'{print $11}'"))}
    
    return stdout_list2



@app.get("/load/{hosts1}")
def read_load(hosts1: str):
    print(hosts1, '#######################hosts')
    print(ansible_load(hosts1))
    return ansible_load(hosts1)



if __name__ == '__main__':
    import uvicorn

    uvicorn.run(app=app,
                host="192.168.0.215",
                port=9999,
                workers=1)


ansible 模块


[root@dev-technology-215l fastapi_websocket_logs]# cat ansible2.py 
import json
import shutil
from ansible.module_utils.common.collections import ImmutableDict
from ansible.parsing.dataloader import DataLoader
from ansible.vars.manager import VariableManager
from ansible.inventory.manager import InventoryManager
from ansible.playbook.play import Play
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.plugins.callback import CallbackBase
from ansible import context
import ansible.constants as C


class ResultCallback(CallbackBase):
    """重写 callbackBase 类的局部办法"""
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.host_ok = {}
        self.host_unreachable = {}
        self.host_failed = {}
        self.task_ok = {}
    def v2_runner_on_unreachable(self, result):
        self.host_unreachable[result._host.get_name()] = result

    def v2_runner_on_ok(self, result, **kwargs):
        self.host_ok[result._host.get_name()] = result

    def v2_runner_on_failed(self, result, **kwargs):
        self.host_failed[result._host.get_name()] = result

class MyAnsiable2():
    def __init__(self,
        connection='local',  # 连贯形式 local 本地形式,smart ssh 形式
        remote_user=None,    # ssh 用户
        remote_password=None,  # ssh 用户的明码,应该是一个字典, key 必须是 conn_pass
        private_key_file=None,  # 指定自定义的私钥地址
        sudo=None, sudo_user=None, ask_sudo_pass=None,
        module_path=None,    # 模块门路,能够指定一个自定义模块的门路
        become=None,         # 是否提权
        become_method=None,  # 提权形式 默认 sudo 能够是 su
        become_user=None,  # 提权后,要成为的用户,并非登录用户
        check=False, diff=False,
        listhosts=None, listtasks=None,listtags=None,
        verbosity=3,
        syntax=None,
        start_at_task=None,
        inventory=None):

        # 函数文档正文
        """
        初始化函数,定义的默认的选项值,在初始化的时候能够传参,以便笼罩默认选项的值
        """
        context.CLIARGS = ImmutableDict(
            connection=connection,
            remote_user=remote_user,
            private_key_file=private_key_file,
            sudo=sudo,
            sudo_user=sudo_user,
            ask_sudo_pass=ask_sudo_pass,
            module_path=module_path,
            become=become,
            become_method=become_method,
            become_user=become_user,
            verbosity=verbosity,
            listhosts=listhosts,
            listtasks=listtasks,
            listtags=listtags,
            syntax=syntax,
            start_at_task=start_at_task,
        )

        # 三元表达式,如果没有传递 inventory, 就应用 "localhost,"
        # 指定 inventory 文件
        # inventory 的值能够是一个 资产清单文件
        # 也能够是一个蕴含主机的元组,这个仅仅实用于测试
        #  比方:1.1.1.1,    # 如果只有一个 IP 最初必须有英文的逗号
        #  或者:1.1.1.1, 2.2.2.2

        self.inventory = inventory if inventory else "localhost,"

        # 实例化数据解析器
        self.loader = DataLoader()

        # 实例化 资产配置对象
        self.inv_obj = InventoryManager(loader=self.loader, sources=self.inventory)

        # 设置明码
        self.passwords = remote_password

        # 实例化回调插件对象
        self.results_callback = ResultCallback()

        # 变量管理器
        self.variable_manager = VariableManager(self.loader, self.inv_obj)

    def run(self, hosts='localhost', gether_facts="no", module="ping", args='', task_time=0):"""
        参数阐明:task_time -- 执行异步工作时期待的秒数,这个须要大于 0,等于 0 的时候不反对异步(默认值)。这个值应该等于执行工作理论耗时工夫为好
        """
        play_source =  dict(
            name = "Ad-hoc",
            hosts = hosts,
            gather_facts = gether_facts,
            tasks = [
                # 这里每个 task 就是这个列表中的一个元素,格局是嵌套的字典
                # 也能够作为参数传递过去,这里就简单化了。{"action":{"module": module, "args": args}, "async": task_time, "poll": 0}])

        play = Play().load(play_source, variable_manager=self.variable_manager, loader=self.loader)

        tqm = None
        try:
            tqm = TaskQueueManager(
                      inventory=self.inv_obj ,
                      variable_manager=self.variable_manager,
                      loader=self.loader,
                      passwords=self.passwords,
                      stdout_callback=self.results_callback)

            result = tqm.run(play)
        finally:
            if tqm is not None:
                tqm.cleanup()
            shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)

    def playbook(self,playbooks):
        """
        Keyword arguments:
        playbooks --  须要是一个列表类型
        """
        from ansible.executor.playbook_executor import PlaybookExecutor

        playbook = PlaybookExecutor(playbooks=playbooks,
                        inventory=self.inv_obj,
                        variable_manager=self.variable_manager,
                        loader=self.loader,
                        passwords=self.passwords)

        # 应用回调函数
        playbook._tqm._stdout_callback = self.results_callback

        result = playbook.run()


    def get_result(self):
      result_raw = {'success':{},'failed':{},'unreachable':{}}

      # print(self.results_callback.host_ok)
      for host,result in self.results_callback.host_ok.items():
          result_raw['success'][host] = result._result
      for host,result in self.results_callback.host_failed.items():
          result_raw['failed'][host] = result._result
      for host,result in self.results_callback.host_unreachable.items():
          result_raw['unreachable'][host] = result._result

      # 最终打印后果,并且应用 JSON 持续格式化
      print(json.dumps(result_raw, indent=4))
      
      return json.dumps(result_raw)


测试


[root@dev-technology-215l fastapi_websocket_logs]# curl -s http://192.168.0.215:9999/load/172.16.19.43
{"load":"9.57"}

[root@dev-technology-215l fastapi_websocket_logs]# pwd
/data/shell/fastapi_websocket_logs

zabbix 自定义监控脚本

编写 load_monitor.py 


[root@sit-cdpapp-162l zabbix]# cat load_monitor.py 
#!/usr/bin/python
# -*- coding:utf-8 -*-

import subprocess
import os,sys,json,datetime,time
import locale
import re
import requests


host2 = sys.argv[1]

r = requests.get('http://192.168.0.215:9999/load/{thost}'.format(thost=host2), timeout=10)


print(r.text)

批改 /etc/zabbix/zabbix_agentd.conf 文件




[root@sit-cdpapp-162l zabbix]# grep  -v  "#"  /etc/zabbix/zabbix_agentd.conf

PidFile=/var/run/zabbix/zabbix_agentd.pid

LogFile=/var/log/zabbix/zabbix_agentd.log
LogFileSize=0

Server=192.168.0.12
ServerActive=192.168.0.12
Hostname=sit-spring-app162
Timeout=10
Include=/etc/zabbix/zabbix_agentd.d/

UnsafeUserParameters=1

UserParameter=process.all[*],/etc/zabbix/processstatus.sh $1 $2
UserParameter=java_monitor[*],/etc/zabbix/java_monitor.py $1
UserParameter=cdp-java_monitor[*],/etc/zabbix/cdp-java_monitor.py $1
UserParameter=node_monitor[*],/etc/zabbix/node_monitor.py $1 $2
UserParameter=load_monitor[*],/etc/zabbix/load_monitor.py $1
UserParameter=pro_elk_port[*],/etc/zabbix/elk_socket_port.py $1 $2
UserParameter=node_monitor2[*],/etc/zabbix/node_monitor-nodomain.py $1 $2
UserParameter=nginx_check_upstream[*],/etc/zabbix/nginx_check_upstream.py $1 $2

退出移动版