import timeimport jsonimport requestsfrom datetime import datetimefrom apscheduler.schedulers.blocking import BlockingSchedulerimport pymysql.cursorsfrom tableau_rest_api import TableauRestApifrom read_config import get_value,REST_API_VERSIONfrom log_utils import logger,logger_no_found,logger_errormy_str = '''[ { "name": "SBJ_CON_BASE_INFO_PRO", "details": "销售明细表", "title": "xiao shou ming xi biao", "type":"mysql", "schedule": "07:15:20" }, { "name": "RM_PH_ENR_DT", "details": "1.1.2余额明细", "title": "1.1.2 yu e ming xi", "type":"mysql", "schedule": "13:15:20" }, { "name": "DMM_INNO_EPO_ENR_DTL", "details": "EPO及余额明细", "title": "EPO ji yu e ming xi", "type":"mysql", "schedule": "12:15:20" }, { "name": "DMM_INNO_CUST_COMPLAINT_BOARD", "details": "客诉监控看板模型", "title": "ke su jian kong kan ban", "type":"mysql", "schedule": "07:45:20" }, { "name": "销售明细Flow", "title": "xiao shou ming xi flow", "details": "销售明细Flow", "type":"restApi", "schedule": "07:30:20" }]'''def getRestInfo(): REST_URL = get_value('REST_URL') config_dict = {} req = requests.get(REST_URL) logger.info(req.text) json_ = json.loads(req.text) for content in json_['data']: key = content['name'] value = content['content'] config_dict[key] = value return config_dictdef fetchServerInfo(tableName,tableTitle,data_time): connection = pymysql.connect(host=get_value('MYSQL_HOST'), port=int(get_value('MYSQL_PORT')), database=get_value('MYSQL_DBNAME'), user=get_value('MYSQL_USER'), password=get_value('MYSQL_PASSWORD'), cursorclass=pymysql.cursors.DictCursor, charset='utf8') with connection.cursor() as cursor: sql = ''' select b.name,b.details,a.date_updated,a.STAT_MSG from bdsp_tab_task_stat a inner join bdsp_tab_task b on a.id_bdsp_tab_task=b.id_bdsp_tab_task and b.name = '{}' and DATE(a.DATE_UPDATED) = '{}' '''.format(tableName,data_time) cursor.execute(sql) result1 = cursor.fetchone() logger.info(result1) if result1 is not None: if result1['STAT_MSG'] != 'SUCCESS' : logger_error(tableName, tableTitle) else: logger.info('{} -- {} -- task status :{}'.format(data_time, tableName, result1['STAT_MSG'])) else: logger_no_found(tableName, tableTitle)def just_echo(): # logger.error('--'*20) msg = '{} {} {}'.format('-'*10,time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),'-'*10) logger.error(msg)def checkMysqlStatus(**options): # current_date = '2021-09-13' current_date = time.strftime('%Y-%m-%d', time.localtime(time.time())) fetchServerInfo(options["name"],options["title"],current_date)def checkRestApiStatus(**options): _dict = getRestInfo() server_url = _dict['TABLEAU_SERVER'] personal_access_token_name = _dict['TABLEAU_USER'] personal_access_token_secret = _dict['TABLEAU_PASSWORD'] tableau_site = _dict['TABLEAU_SITE'] # personal_access_token_name = 'PH-BDSP-TABLEAU' # personal_access_token_secret = 'ryl7Gg6uRoWg+LbsMpYmJA==:qLZxwye6u8ofj3pdGDtZKpZrvDG3kW5A' # tableau_site = '' # server_url = 'https://phbdsp-tab-stg1.paic.com.cn' server = TableauRestApi(REST_API_VERSION,server_url) server.sign_in(personal_access_token_name=personal_access_token_name,personal_access_token_secret=personal_access_token_secret,site_name=tableau_site) target_flow = server.quert_flow_for_user(options["name"]) logger.info(target_flow) if target_flow is not None: current_date = time.strftime('%Y-%m-%d', time.localtime(time.time())) server.get_flow_run_status(target_flow,options['title'], current_date) else: logger_no_found(options["name"],options["title"])if __name__ == '__main__': scheduler = BlockingScheduler() _json = json.loads(my_str) scheduler.add_job(just_echo, 'cron', second='0') for idx in _json: logger.info(idx) scheduleTime = datetime.strptime(idx['schedule'], '%H:%M:%S') if idx['type'] == 'mysql': scheduler.add_job(checkMysqlStatus, 'cron', hour=scheduleTime.hour, minute=scheduleTime.minute, second=scheduleTime.second, kwargs=idx) else: scheduler.add_job(checkRestApiStatus, 'cron',hour=scheduleTime.hour, minute=scheduleTime.minute, second=scheduleTime.second, kwargs=idx) scheduler.start()