关于分布式:ScheduleMaster分布式任务调度中心基本使用和原理

31次阅读

共计 10345 个字符,预计需要花费 26 分钟才能阅读完成。

@[toc]

一、ScheduleMaster 外围概念

  • 概念
    对立执多个零碎的工作【回收超时订单,清理垃圾信息】,如图:

二、ScheduleMaster 利用场景

  • 场景
    次要利用在微服务零碎中。
    如图:

三、ScheduleMaster 我的项目落地

  • 工具

    • ScheduleMaster
      网盘下载地址:
      链接:https://pan.baidu.com/s/1LcCH…
      提取码:eyup
    • Demo 我的项目
  • 步骤

    • 运行 ScheduleMaster

      • 启动命令

        # 进入 Hos.ScheduleMaster.Web\bin\Release\netcoreapp3.1\publish 目录中启动
        #备注:默认数据库为 sqlserver,如果其余数据库须要在 appsettings.json 中批改数据库类型和数据库连贯地址
          dotnet Hos.ScheduleMaster.Web.dll
        #进入 Hos.ScheduleMaster.QuartzHost\bin\Release\netcoreapp3.1\publish 目录中启动
          dotnet Hos.ScheduleMaster.QuartzHost.dll --urls http://*:30001

        运行后果如图:

        浏览器运行后果:http://localhost:30000,用户名:admin 明码:111111
        如图:

    • Demo 我的项目

      • 新建一个订单回收 API 接口

          private readonly ILogger<HomeController> logger;
          public HomeController(ILogger<HomeController> _logger) {logger = _logger;} 
          /// <summary>
          /// 超时订单回收接口
          /// </summary>
          /// <returns></returns>
          [HttpPost]
          public IActionResult OrderCancel() 
          {logger.LogInformation("回收订单工作");
              return Ok("回收订单工作");
          }
    • 新建工作

      • 点击工作列表
        如图:
      • 点击创立工作按钮 -> 抉择“根底信息配置”
        如图:


        抉择“元数据配置”,在点击“保留”即可, 如图:

      • 运行后果(每隔 5 秒进行调用订单回收接口),如图:

四、ScheduleMaster 运行原理

  • 原理

    • Master 概念
      主节点:协调 Hos.ScheduleMaster.Web
    • Node 概念
      工作节点:执行业务 Hos.ScheduleMaster.QuartzHost
    • 数据库
      用来存储工作信息。
    • 全局架构
      如图:
    • 执行过程
      客户端 —->Hos.ScheduleMaster.Web(master 节点)—->Hos.ScheduleMaster.QuartzHost(工作节点)—-> 订单回收接口

      • master 节点的外围

        • 抉择工作节点
        • 指定工作节点,执行工作
      • 工作节点的外围

        • 取出工作配置信息
        • 应用 Quartz 依据配置运行工作

          • 应用 HttpClient 调用接口
          • 应用反射调用程序集办法

            五、ScheduleMaster 程序集工作

  • 工具

    • 控制台 Demo 我的项目
    • ScheduleMaster
  • 步骤

    • 新建一个 Console 我的项目

        // 我的项目装置  
        ScheduleMaster

      新建 OrderServer 类继承

         public  class OrderServer:TaskBase
         {public override void Run(TaskContext context)
           {
              // 超时订单逻辑
              context.WriteLog("订单开始回收....... 胜利");
           }
         }
      • 控制台我的项目打包
        我的项目编译后,进入 bin 文件件,将 Hos.ScheduleMaster.Base.dll 除外的所有的文件打包成压缩文件,如图:
    • ScheduleMaster 配置

      • 点击“工作列表”目录,点击“创立工作”按钮,工作类型抉择“程序集工作”,如图:

六、ScheduleMaster API 接口工作(应用代码自定义创立工作)

  • 实现代码如下:

          /// <summary>
          /// 通过代码创立工作
          /// </summary>
          /// <returns></returns>
          [HttpPost("CreateTask")]
          public async Task<IActionResult> CreateTask()
          {HttpClient client = new HttpClient();
              // 登录 设置用户名和明码
              client.DefaultRequestHeaders.Add("ms_auth_user", "admin");
              client.DefaultRequestHeaders.Add("ms_auth_secret", MD5($"admin{MD5("111111")}admin"));
              List<KeyValuePair<string, string>> args = new List<KeyValuePair<string, string>>();
              args.Add(new KeyValuePair<string, string>("MetaType", "2"));
              args.Add(new KeyValuePair<string, string>("RunLoop", "true"));
              args.Add(new KeyValuePair<string, string>("CronExpression", "0/5 * * * * ?"));
              args.Add(new KeyValuePair<string, string>("Remark", "By Xunit Tester Created"));
              args.Add(new KeyValuePair<string, string>("StartDate", DateTime.Today.ToString("yyyy-MM-dd HH:mm:ss")));
              args.Add(new KeyValuePair<string, string>("Title", "order_cancel_http_api"));
              args.Add(new KeyValuePair<string, string>("HttpRequestUrl", "http://localhost:5000/Order"));
              args.Add(new KeyValuePair<string, string>("HttpMethod", "POST"));
              args.Add(new KeyValuePair<string, string>("HttpContentType", "application/json"));
              args.Add(new KeyValuePair<string, string>("HttpHeaders", "[]"));
              args.Add(new KeyValuePair<string, string>("HttpBody", "{ \"Posts\": [{ \"PostId\": 666, \"Title\": \"tester\", \"Content\":\"testtesttest\"}], \"BlogId\": 111, \"Url\":\"qweqrrttryrtyrtrtrt\"}"));
              HttpContent reqContent = new FormUrlEncodedContent(args);
              var response = await client.PostAsync("http://localhost:30000/api/Task/Create", reqContent);
              var content = await response.Content.ReadAsStringAsync();
              logger.LogInformation(content); 
              return Ok("创立胜利"); 
          } 
          /// <summary>
          /// MD5 加密
          /// </summary>
          /// <param name="prestr"></param>
          /// <returns></returns>
          public static string MD5(string prestr)
          {StringBuilder sb = new StringBuilder(32);
              MD5 md5 = new MD5CryptoServiceProvider();
              byte[] t = md5.ComputeHash(Encoding.GetEncoding("UTF-8").GetBytes(prestr));
              for (int i = 0; i < t.Length; i++)
              {sb.Append(t[i].ToString("x").PadLeft(2, '0'));
              }
              return sb.ToString();} 
  • 官网 API

    • API Server 对接流程
      对于凋谢接口来说,应用签名验证曾经是必不可少的一环,这是保证系统安全性的重要伎俩。看一下外围对接流程:

      • 在控制台中创立好专用的 API 对接用户账号。
      • 应用对接账号的用户名设置为 http header 中的 ms_auth_user 值。
      • 应用通过哈希运算过的秘钥设置为 http header 中的 ms_auth_secret 值,计算规定:按{用户名}{hash(明码)}{用户名} 的格局拼接失去字符串 str,而后再对 str 做一次 hash 运算即失去最终秘钥,hash 函数是小写的 32 位 MD5 算法。
      • 应用 form 格局发动 http 调用,如果非法用户会返回 401-Unauthorized。
        代码示例:

          HttpClient client = new HttpClient();
          client.DefaultRequestHeaders.Add("ms_auth_user", "admin");
          client.DefaultRequestHeaders.Add("ms_auth_secret", SecurityHelper.MD5($"admin{SecurityHelper.MD5("111111")}}admin"));

        签名验证这块设计的比较简单,具体源码逻辑能够参考Hos.ScheduleMaster.Web.Filters.AccessControlFilter

    • API 返回格局
      所有接口采纳对立的返回格局,字段如下:

      参数名称 参数类型 阐明
      Success bool 是否胜利
      Status int 后果状态,0- 申请失败 1- 申请胜利 2- 登录失败 3- 参数异样 4- 数据异样
      Message string 返回的音讯
      Data object 返回的数据
    • 创立程序集工作

      • 接口地址:http://yourip:30000/api/task/create
      • 申请类型:POST
      • 参数格局:application/x-www-form-urlencoded
      • 返回后果:创立胜利返回工作 id
      • 参数列表:
      参数名称 参数类型 是否必填 阐明
      MetaType int 工作类型,这里固定是 1
      Title string 工作名称
      RunLoop bool 是否按周期执行
      CronExpression string cron 表达式,如果 RunLoop 为 true 则必填
      AssemblyName string 程序集名称
      ClassName string 执行类名称,蕴含残缺命名空间
      StartDate DateTime 工作开始工夫
      EndDate DateTime 工作进行工夫,为空示意不限进行工夫
      Remark string 工作形容阐明
      Keepers List<int> 监护人 id
      Nexts List<guid> 子级任务 id
      Executors List<string> 执行节点名称
      RunNow bool 创立胜利是否立刻启动
      Params List<ScheduleParam> 自定义参数列表,也能够通过 CustomParamsJson 字段间接传 json 格局字符串

      ScheduleParam:

      参数名称 参数类型 是否必填 阐明
      ParamKey string 参数名称
      ParamValue string 参数值
      ParamRemark string 参数阐明

      代码示例:

          HttpClient client = new HttpClient();
          List<KeyValuePair<string, string>> args = new List<KeyValuePair<string, string>>();
          args.Add(new KeyValuePair<string, string>("MetaType", "1"));
          args.Add(new KeyValuePair<string, string>("RunLoop", "true"));
          args.Add(new KeyValuePair<string, string>("CronExpression", "33 0/8 * * * ?"));
          args.Add(new KeyValuePair<string, string>("Remark", "By Xunit Tester Created"));
          args.Add(new KeyValuePair<string, string>("StartDate", DateTime.Today.ToString("yyyy-MM-dd HH:mm:ss")));
          args.Add(new KeyValuePair<string, string>("Title", "程序集接口测试工作"));
          args.Add(new KeyValuePair<string, string>("AssemblyName", "Hos.ScheduleMaster.Demo"));
          args.Add(new KeyValuePair<string, string>("ClassName", "Hos.ScheduleMaster.Demo.Simple"));
          args.Add(new KeyValuePair<string, string>("CustomParamsJson", "[{\"ParamKey\":\"k1\",\"ParamValue\":\"1111\",\"ParamRemark\":\"r1\"},{\"ParamKey\":\"k2\",\"ParamValue\":\"2222\",\"ParamRemark\":\"r2\"}]"));
          args.Add(new KeyValuePair<string, string>("Keepers", "1"));
          args.Add(new KeyValuePair<string, string>("Keepers", "2"));
          //args.Add(new KeyValuePair<string, string>("Nexts", ""));
          //args.Add(new KeyValuePair<string, string>("Executors", ""));
          HttpContent reqContent = new FormUrlEncodedContent(args);
          var response = await client.PostAsync("http://localhost:30000/api/Task/Create", reqContent);
          var content = await response.Content.ReadAsStringAsync();
          Debug.WriteLine(content);

      要提一下的是,应用 API 创立工作的形式不反对上传程序包,所以在工作须要启动时要确保程序包已通过其余形式上传,否则会启动失败。

    • 创立 HTTP 工作

      • 接口地址:http://yourip:30000/api/task/create
      • 申请类型:POST
      • 参数格局:application/x-www-form-urlencoded
      • 返回后果:创立胜利返回工作 id
      • 参数列表:
      参数名称 参数类型 是否必填 阐明
      MetaType int 工作类型,这里固定是 2
      Title string 工作名称
      RunLoop bool 是否按周期执行
      CronExpression string cron 表达式,如果 RunLoop 为 true 则必填
      StartDate DateTime 工作开始工夫
      EndDate DateTime 工作进行工夫,为空示意不限进行工夫
      Remark string 工作形容阐明
      HttpRequestUrl string 申请地址
      HttpMethod string 申请形式,仅反对 GET\POST\PUT\DELETE
      HttpContentType string 参数格局,仅反对 application/json 和 application/x-www-form-urlencoded
      HttpHeaders string 自定义申请头,ScheduleParam 列表的 json 字符串
      HttpBody string 如果是 json 格局参数,则是对应参数的 json 字符串;如果是 form 格局参数,则是对应 ScheduleParam 列表的 json 字符串。
      Keepers List<int> 监护人 id
      Nexts List<guid> 子级任务 id
      Executors List<string> 执行节点名称
      RunNow bool 创立胜利是否立刻启动

      代码示例:

          HttpClient client = new HttpClient();
          List<KeyValuePair<string, string>> args = new List<KeyValuePair<string, string>>();
          args.Add(new KeyValuePair<string, string>("MetaType", "2"));
          args.Add(new KeyValuePair<string, string>("RunLoop", "true"));
          args.Add(new KeyValuePair<string, string>("CronExpression", "22 0/8 * * * ?"));
          args.Add(new KeyValuePair<string, string>("Remark", "By Xunit Tester Created"));
          args.Add(new KeyValuePair<string, string>("StartDate", DateTime.Today.ToString("yyyy-MM-dd HH:mm:ss")));
          args.Add(new KeyValuePair<string, string>("Title", "Http 接口测试工作"));
          args.Add(new KeyValuePair<string, string>("HttpRequestUrl", "http://localhost:56655/api/1.0/value/jsonpost"));
          args.Add(new KeyValuePair<string, string>("HttpMethod", "POST"));
          args.Add(new KeyValuePair<string, string>("HttpContentType", "application/json"));
          args.Add(new KeyValuePair<string, string>("HttpHeaders", "[]"));
          args.Add(new KeyValuePair<string, string>("HttpBody", "{ \"Posts\": [{ \"PostId\": 666, \"Title\": \"tester\", \"Content\":\"testtesttest\"}], \"BlogId\": 111, \"Url\":\"qweqrrttryrtyrtrtrt\"}"));
          HttpContent reqContent = new FormUrlEncodedContent(args);
          var response = await client.PostAsync("http://localhost:30000/api/Task/Create", reqContent);
          var content = await response.Content.ReadAsStringAsync();
          Debug.WriteLine(content);
    • 创立延时工作

      • 接口地址:http://yourip:30000/api/delaytask/create
      • 申请类型:POST
      • 参数格局:application/x-www-form-urlencoded
      • 返回后果:创立胜利返回工作 id
      • 参数列表:
      参数名称 参数类型 是否必填 阐明
      SourceApp string 起源
      Topic string 主题
      ContentKey string 业务关键字
      DelayTimeSpan int 提早绝对工夫
      DelayAbsoluteTime DateTime 提早相对工夫
      NotifyUrl string 回调地址
      NotifyDataType string 回调参数格局,仅反对 application/json 和 application/x-www-form-urlencoded
      NotifyBody string 回调参数,json 格局字符串

      代码示例:

          for (int i = 0; i < 5; i++)
          {int rndNum = new Random().Next(20, 500);
              List<KeyValuePair<string, string>> args = new List<KeyValuePair<string, string>>();
              args.Add(new KeyValuePair<string, string>("SourceApp", "TestApp"));
              args.Add(new KeyValuePair<string, string>("Topic", "TestApp.Trade.TimeoutCancel"));
              args.Add(new KeyValuePair<string, string>("ContentKey", i.ToString()));
              args.Add(new KeyValuePair<string, string>("DelayTimeSpan", rndNum.ToString()));
              args.Add(new KeyValuePair<string, string>("DelayAbsoluteTime", DateTime.Now.AddSeconds(rndNum).ToString("yyyy-MM-dd HH:mm:ss")));
              args.Add(new KeyValuePair<string, string>("NotifyUrl", "http://localhost:56655/api/1.0/value/delaypost"));
              args.Add(new KeyValuePair<string, string>("NotifyDataType", "application/json"));
              args.Add(new KeyValuePair<string, string>("NotifyBody", "{ \"Posts\": [{ \"PostId\": 666, \"Title\": \"tester\", \"Content\":\"testtesttest\"}], \"BlogId\": 111, \"Url\":\"qweqrrttryrtyrtrtrt\"}"));
              HttpContent reqContent = new FormUrlEncodedContent(args);
              var response = await client.PostAsync("http://localhost:30000/api/DelayTask/Create", reqContent);
              var content = await response.Content.ReadAsStringAsync();
              Debug.WriteLine(content);
          }

      七、ScheduleMaster 集群和集群原理

  • 次要针对工作节点应用集群

    • 步骤

      • 进入工作节点我的项目根目录下【ScheduleMasterCore\Hos.ScheduleMaster.QuartzHost\bin\Release\netcoreapp3.1\publish】,更改配置文件【appsettings.json】,因为要启动多个节点,只须要更改节点名称和端口号即可【切记:节点名称不可能反复】,如下:

          "NodeSetting": {
            "IdentityName": "worker1",  // 节点名称
            "Role": "worker",
            "Protocol": "http",
            "IP": "localhost",
            "Port": 30001,  // 端口号
            "Priority": 1,
            "MaxConcurrency": 20
          }
      • 启动

          #进入 Hos.ScheduleMaster.QuartzHost\bin\Release\netcoreapp3.1\publish 目录中启动
          dotnet Hos.ScheduleMaster.QuartzHost.dll --urls http://*:30002
      • 运行后果,如图:
  • 场景

    • 如果以后的工作节点宕机了,会不会转移到其余的工作节点上?
      须要更改工作的执行节点,抉择多个执行节点,如图:

      有两个工作节点,30001 和 30002,其中把 30001 宕机了[有个心跳检测的 API 接口,定时工作一直的检测以后的节点是否可用],会间接转移到其余节点执行工作,运行后果如下:

正文完
 0