启动

  • make run 调用 ./bin/apisix start
  • 寻找 juajit 门路 运行 /usr/local/Cellar/openresty/1.19.9.1_2/luajit/bin/luajit ./apisix/cli/apisix.lua start
  • 调用 ops.lua 里的 start 办法,初始化配置,ETCD,执行openresty 启动命令

    local function start(env, ...)  init(env)  init_etcd(env, args)  util.execute_cmd(env.openresty_args)end
  • 初始化 nginx 配置,通过读取 conf/config.yaml 配合模板 ngx_tpl.lua 生成 nginx config 文件。供 openresty(nginx)应用

     local conf_render = template.compile(ngx_tpl)  local ngxconf = conf_render(sys_conf)  local ok, err = util.write_file(env.apisix_home .. "/conf/nginx.conf",                                  ngxconf)  if not ok then      util.die("failed to update nginx.conf: ", err, "\n")  end
  • 初始化 ETCD,读取ETCD集群配置,进行连贯

    local version_url = host .. "/version"      local errmsg      local res, err      local retry_time = 0      while retry_time < 2 do          res, err = request(version_url, yaml_conf)          -- In case of failure, request returns nil followed by an error message.          -- Else the first return value is the response body          -- and followed by the response status code.          if res then              break          end          retry_time = retry_time + 1          print(str_format("Warning! Request etcd endpoint \'%s\' error, %s, retry time=%s",                           version_url, err, retry_time))      end
  • 执行openresty 启动命令 openresty -p /usr/local/apisix -c /conf/nginx.conf

      local openresty_args = [[openresty -p ]] .. apisix_home .. [[ -c ]]                         .. apisix_home .. [[/conf/nginx.conf]]  util.execute_cmd(env.openresty_args)

插件流程

nginx 配置中嵌入的 apisix 流程

 init_by_lua_block {        require "resty.core"        apisix = require("apisix")        local dns_resolver = { "172.19.2.70", "172.19.2.62", }        local args = {            dns_resolver = dns_resolver,        }        apisix.http_init(args)}    init_worker_by_lua_block {        apisix.http_init_worker()}exit_worker_by_lua_block {        apisix.http_exit_worker()}access_by_lua_block {                apisix.http_access_phase()}header_filter_by_lua_block {                apisix.http_header_filter_phase()}body_filter_by_lua_block {                apisix.http_body_filter_phase()}log_by_lua_block {                apisix.http_log_phase()}proxy_pass      $upstream_scheme://apisix_backend$upstream_uri;upstream apisix_backend {        server 0.0.0.1;        balancer_by_lua_block {            apisix.http_balancer_phase()        }        keepalive 320;        keepalive_requests 1000;        keepalive_timeout 60s;}
  • apisix.http_init
    1.设置 dns resolver
    2.设置实例id
    3.启动 privileged agent

      core.resolver.init_resolver(args)  core.id.init()  local process = require("ngx.process")  local ok, err = process.enable_privileged_agent()
  • apisix.http_init_worker

    function _M.http_init_worker()  local seed, err = core.utils.get_seed_from_urandom()  if not seed then      core.log.warn('failed to get seed from urandom: ', err)      seed = ngx_now() * 1000 + ngx.worker.pid()  end  math.randomseed(seed)  -- for testing only  core.log.info("random test in [1, 10000]: ", math.random(1, 10000))  local we = require("resty.worker.events")  local ok, err = we.configure({shm = "worker-events", interval = 0.1})  if not ok then      error("failed to init worker event: " .. err)  end  local discovery = require("apisix.discovery.init").discovery  if discovery and discovery.init_worker then      discovery.init_worker()  end  require("apisix.balancer").init_worker()  load_balancer = require("apisix.balancer")  require("apisix.admin.init").init_worker()  require("apisix.timers").init_worker()  plugin.init_worker()  router.http_init_worker()  require("apisix.http.service").init_worker()  plugin_config.init_worker()  require("apisix.consumer").init_worker()  if core.config == require("apisix.core.config_yaml") then      core.config.init_worker()  end  require("apisix.debug").init_worker()  apisix_upstream.init_worker()  require("apisix.plugins.ext-plugin.init").init_worker()  local_conf = core.config.local_conf()  if local_conf.apisix and local_conf.apisix.enable_server_tokens == false then      ver_header = "APISIX"  endend

1.初始化 openresty worker event

    local we = require("resty.worker.events")    local ok, err = we.configure({shm = "worker-events", interval = 0.1})    if not ok then        error("failed to init worker event: " .. err)    end

2.初始化服务发现
3.初始化balancer组件
4.初始化admin组件(会同步一次插件配置到 etcd)

     sync_local_conf_to_etcd(true)

5.初始化timers组件
6.初始化 plugin 组件(清掉旧的 table, 而后把从config-default.yaml 和 config.yaml 文件中读取插件配置放到一个 local_plugins_hash中,并按优先级排序)

local local_conf_path = profile:yaml_path("config-default")local default_conf_yaml, err = util.read_file(local_conf_path)local_conf_path = profile:yaml_path("config")local user_conf_yaml, err = util.read_file(local_conf_path) ok, err = merge_conf(default_conf, user_conf) local local_plugins = core.table.new(32, 0)  for name in pairs(local_plugins_hash) do        unload_plugin(name)    end    core.table.clear(local_plugins)    core.table.clear(local_plugins_hash)    for name in pairs(processed) do        load_plugin(name, local_plugins)    end    -- sort by plugin's priority    if #local_plugins > 1 then        sort_tab(local_plugins, sort_plugin)    end    local plugin_metadatas, err = core.config.new("/plugin_metadata",        {automatic = true}    )

7.初始化router组件(初始化 etcd /global_rules 数据)

    local global_rules, err = core.config.new("/global_rules", {            automatic = true,            item_schema = core.schema.global_rule,            checker = plugin_checker,        })

8.初始化 servers 组件(从ETCD抓取 services 配置)

services, err = core.config.new("/services", {        automatic = true,        item_schema = core.schema.service,        checker = plugin_checker,        filter = filter,    })

9.初始化sonsumer组件
10.同步 config_yaml 到各个过程

    -- sync data in each non-master process    ngx.timer.every(1, read_apisix_yaml)

11.初始化upstream组件
12.初始化 ext-plugin 组件

  • apisix.http_exit_worker()
    进行 "privileged agent"
  • apisix.http_access_phase()

    function _M.http_access_phase()  local ngx_ctx = ngx.ctx  if not verify_tls_client(ngx_ctx.api_ctx) then      return core.response.exit(400)  end  -- always fetch table from the table pool, we don't need a reused api_ctx  local api_ctx = core.tablepool.fetch("api_ctx", 0, 32)  ngx_ctx.api_ctx = api_ctx  core.ctx.set_vars_meta(api_ctx)  local uri = api_ctx.var.uri  if local_conf.apisix and local_conf.apisix.delete_uri_tail_slash then      if str_byte(uri, #uri) == str_byte("/") then          api_ctx.var.uri = str_sub(api_ctx.var.uri, 1, #uri - 1)          core.log.info("remove the end of uri '/', current uri: ",                        api_ctx.var.uri)      end  end  if router.api.has_route_not_under_apisix() or      core.string.has_prefix(uri, "/apisix/")  then      local skip = local_conf and local_conf.apisix.global_rule_skip_internal_api      local matched = router.api.match(api_ctx, skip)      if matched then          return      end  end  router.router_http.match(api_ctx)  local route = api_ctx.matched_route  if not route then      -- run global rule      plugin.run_global_rules(api_ctx, router.global_rules, nil)      core.log.info("not find any matched route")      return core.response.exit(404,                  {error_msg = "404 Route Not Found"})  end  core.log.info("matched route: ",                core.json.delay_encode(api_ctx.matched_route, true))  local enable_websocket = route.value.enable_websocket  if route.value.plugin_config_id then      local conf = plugin_config.get(route.value.plugin_config_id)      if not conf then          core.log.error("failed to fetch plugin config by ",                          "id: ", route.value.plugin_config_id)          return core.response.exit(503)      end      route = plugin_config.merge(route, conf)  end  if route.value.service_id then      local service = service_fetch(route.value.service_id)      if not service then          core.log.error("failed to fetch service configuration by ",                         "id: ", route.value.service_id)          return core.response.exit(404)      end      route = plugin.merge_service_route(service, route)      api_ctx.matched_route = route      api_ctx.conf_type = "route&service"      api_ctx.conf_version = route.modifiedIndex .. "&" .. service.modifiedIndex      api_ctx.conf_id = route.value.id .. "&" .. service.value.id      api_ctx.service_id = service.value.id      api_ctx.service_name = service.value.name      if enable_websocket == nil then          enable_websocket = service.value.enable_websocket      end  else      api_ctx.conf_type = "route"      api_ctx.conf_version = route.modifiedIndex      api_ctx.conf_id = route.value.id  end  api_ctx.route_id = route.value.id  api_ctx.route_name = route.value.name  -- run global rule  plugin.run_global_rules(api_ctx, router.global_rules, nil)  if route.value.script then      script.load(route, api_ctx)      script.run("access", api_ctx)  else      local plugins = plugin.filter(route)      api_ctx.plugins = plugins      plugin.run_plugin("rewrite", plugins, api_ctx)      if api_ctx.consumer then          local changed          route, changed = plugin.merge_consumer_route(              route,              api_ctx.consumer,              api_ctx          )          core.log.info("find consumer ", api_ctx.consumer.username,                        ", config changed: ", changed)          if changed then              api_ctx.matched_route = route              core.table.clear(api_ctx.plugins)              api_ctx.plugins = plugin.filter(route, api_ctx.plugins)          end      end      plugin.run_plugin("access", plugins, api_ctx)  end  local up_id = route.value.upstream_id  -- used for the traffic-split plugin  if api_ctx.upstream_id then      up_id = api_ctx.upstream_id  end  if up_id then      local upstream = get_upstream_by_id(up_id)      api_ctx.matched_upstream = upstream  else      if route.has_domain then          local err          route, err = parse_domain_in_route(route)          if err then              core.log.error("failed to get resolved route: ", err)              return core.response.exit(500)          end          api_ctx.conf_version = route.modifiedIndex          api_ctx.matched_route = route      end      local route_val = route.value      if route_val.upstream and route_val.upstream.enable_websocket then          enable_websocket = true      end      api_ctx.matched_upstream = (route.dns_value and                                  route.dns_value.upstream)                                 or route_val.upstream  end  if enable_websocket then      api_ctx.var.upstream_upgrade    = api_ctx.var.http_upgrade      api_ctx.var.upstream_connection = api_ctx.var.http_connection      core.log.info("enabled websocket for route: ", route.value.id)  end  if route.value.service_protocol == "grpc" then      api_ctx.upstream_scheme = "grpc"  end  local code, err = set_upstream(route, api_ctx)  if code then      core.log.error("failed to set upstream: ", err)      core.response.exit(code)  end  local server, err = load_balancer.pick_server(route, api_ctx)  if not server then      core.log.error("failed to pick server: ", err)      return core.response.exit(502)  end  api_ctx.picked_server = server  set_upstream_headers(api_ctx, server)  -- run the before_proxy method in access phase first to avoid always reinit request  common_phase("before_proxy")  local ref = ctxdump.stash_ngx_ctx()  core.log.info("stash ngx ctx: ", ref)  ngx_var.ctx_ref = ref  local up_scheme = api_ctx.upstream_scheme  if up_scheme == "grpcs" or up_scheme == "grpc" then      return ngx.exec("@grpc_pass")  end  if api_ctx.dubbo_proxy_enabled then      return ngx.exec("@dubbo_pass")  endend

    1.初始化 api_ctx 上下文
    2.client tls 验证
    3.是否为 apisix 曾经注册的门路,对申请进行匹配,外部应用 Radix tree 进行匹配
    4.向上下文注入该申请匹配的 apisix route ,service 等信息用于后续阶段应用
    5.向上下文注入该申请相干的插件,例如:申请对应的路由存在插件,若申请存在对应的 service 则退出 service 定义的插件,以及全局插件
    6.调用 "rewrite" 阶段的插件
    7.调用 "access" 阶段的插件
    8.获取 upstream
    9.执行 loadbalancer 抉择 server
    10.调用 "balancer" 阶段插件
    11.判断 upstream ,依据 upstream 类型,grpc dubbo 等进入 @grpc_pass @dubbo_pass 等不同的后续解决流程。 这些配置可在 nginx.conf 中查看

  • apisix.http_balancer_phase()

    function _M.http_header_filter_phase()  if ngx_var.ctx_ref ~= '' then      -- prevent for the table leak      local stash_ctx = fetch_ctx()      -- internal redirect, so we should apply the ctx      if ngx_var.from_error_page == "true" then          ngx.ctx = stash_ctx      end  end  core.response.set_header("Server", ver_header)  local up_status = get_var("upstream_status")  if up_status and #up_status == 3     and tonumber(up_status) >= 500     and tonumber(up_status) <= 599  then      set_resp_upstream_status(up_status)  elseif up_status and #up_status > 3 then      -- the up_status can be "502, 502" or "502, 502 : "      local last_status      if str_byte(up_status, -1) == str_byte(" ") then          last_status = str_sub(up_status, -6, -3)      else          last_status = str_sub(up_status, -3)      end      if tonumber(last_status) >= 500 and tonumber(last_status) <= 599 then          set_resp_upstream_status(up_status)      end  end  common_phase("header_filter")end

1.设置头 "Server", APISIX
2.设置上游状态头:X-APISIX-Upstream-Status
3.执行 “header_filter” 阶段的插件

  • apisix.http_body_filter_phase()

    function _M.http_body_filter_phase()  common_phase("body_filter")end

    执行 “body_filter” 阶段的插件

  • apisix.http_log_phase()

    function _M.http_log_phase()  if ngx_var.ctx_ref ~= '' then      -- prevent for the table leak      local stash_ctx = fetch_ctx()      -- internal redirect, so we should apply the ctx      if ngx_var.from_error_page == "true" then          ngx.ctx = stash_ctx      end  end  local api_ctx = common_phase("log")  if not api_ctx then      return  end  healthcheck_passive(api_ctx)  if api_ctx.server_picker and api_ctx.server_picker.after_balance then      api_ctx.server_picker.after_balance(api_ctx, false)  end  if api_ctx.uri_parse_param then      core.tablepool.release("uri_parse_param", api_ctx.uri_parse_param)  end  core.ctx.release_vars(api_ctx)  if api_ctx.plugins then      core.tablepool.release("plugins", api_ctx.plugins)  end  if api_ctx.curr_req_matched then      core.tablepool.release("matched_route_record", api_ctx.curr_req_matched)  end  core.tablepool.release("api_ctx", api_ctx)end

    1.执行 “log” 阶段的插件
    2.回收 plugins, matched_route_record,api_ctx 缓存

  • apisix.http_balancer_phase()

    function _M.http_balancer_phase()  local api_ctx = ngx.ctx.api_ctx  if not api_ctx then      core.log.error("invalid api_ctx")      return core.response.exit(500)  end  load_balancer.run(api_ctx.matched_route, api_ctx, common_phase)end

    1.设置 balance 的基本参数(超时工夫,连贯失败重试次数)
    2.抉择上游服务器,采纳一致性hash算法
    3.调用 set_current_peer 设置 proxy_pass

自定义插件逻辑





通过 common_phase 作为自定义插件的办法的公共入口,被 openresty 各个环节调用

local function common_phase(phase_name)    local api_ctx = ngx.ctx.api_ctx    if not api_ctx then        return    end    plugin.run_global_rules(api_ctx, api_ctx.global_rules, phase_name)    if api_ctx.script_obj then        script.run(phase_name, api_ctx)        return api_ctx, true    end    return plugin.run_plugin(phase_name, nil, api_ctx)end

真正执行自定义插件逻辑

 for i = 1, #plugins, 2 do            local phase_func = plugins[i][phase]            if phase_func then                plugin_run = true                local code, body = phase_func(plugins[i + 1], api_ctx)                if code or body then                    if is_http then                        if code >= 400 then                            core.log.warn(plugins[i].name, " exits with http status code ", code)                        end                        core.response.exit(code, body)                    else                        if code >= 400 then                            core.log.warn(plugins[i].name, " exits with status code ", code)                        end                        ngx_exit(1)                    end                end            end        end

admin api

        location /apisix/admin {            set $upstream_scheme             'http';            set $upstream_host               $http_host;            set $upstream_uri                '';                allow 127.0.0.0/24;                deny all;            content_by_lua_block {                apisix.http_admin()            }        }

自定义插件

入口

local function common_phase(phase_name)    local api_ctx = ngx.ctx.api_ctx    if not api_ctx then        return    end    plugin.run_global_rules(api_ctx, api_ctx.global_rules, phase_name)    if api_ctx.script_obj then        script.run(phase_name, api_ctx)        return api_ctx, true    end    return plugin.run_plugin(phase_name, nil, api_ctx)end

已知的阶段
preread
ssl
access
balancer
rewrite
header_filter
body_filter
log

admin api

入口

        location /apisix/admin {            set $upstream_scheme             'http';            set $upstream_host               $http_host;            set $upstream_uri                '';                allow 127.0.0.0/24;                deny all;            content_by_lua_block {                apisix.http_admin()            }        }

依据路由进行转发

    local ok = router:dispatch(get_var("uri"), {method = get_method()})

具体路由 dispatch 逻辑

function _M.dispatch(self, path, opts, ...)    if type(path) ~= "string" then        error("invalid argument path", 2)    end    local args    local len = select('#', ...)    if len > 0 then        if not self.args then            self.args = {...}        else            clear_tab(self.args)            for i = 1, len do                self.args[i] = select(i, ...)            end        end        -- To keep the self.args in safe,        -- we can't yield until filter_fun is called        args = self.args        args[0] = len    end    local route, err = match_route(self, path, opts or empty_table, args)    if not route then        if err then            return nil, err        end        return nil    end    local handler = route.handler    if not handler or type(handler) ~= "function" then        return nil, "missing handler"    end    handler(...)    return trueend

路由与hanlder 的关系

local uri_route = {    {        paths = [[/apisix/admin/*]],        methods = {"GET", "PUT", "POST", "DELETE", "PATCH"},        handler = run,    },    {        paths = [[/apisix/admin/stream_routes/*]],        methods = {"GET", "PUT", "POST", "DELETE", "PATCH"},        handler = run_stream,    },    {        paths = [[/apisix/admin/plugins/list]],        methods = {"GET"},        handler = get_plugins_list,    },    {        paths = reload_event,        methods = {"PUT"},        handler = post_reload_plugins,    },}

调用 run 办法

local function run()    local api_ctx = {}    core.ctx.set_vars_meta(api_ctx)    ngx.ctx.api_ctx = api_ctx    local ok, err = check_token(api_ctx)    if not ok then        core.log.warn("failed to check token: ", err)        core.response.exit(401)    end    local uri_segs = core.utils.split_uri(ngx.var.uri)    core.log.info("uri: ", core.json.delay_encode(uri_segs))    -- /apisix/admin/schema/route    local seg_res, seg_id = uri_segs[4], uri_segs[5]    local seg_sub_path = core.table.concat(uri_segs, "/", 6)    if seg_res == "schema" and seg_id == "plugins" then        -- /apisix/admin/schema/plugins/limit-count        seg_res, seg_id = uri_segs[5], uri_segs[6]        seg_sub_path = core.table.concat(uri_segs, "/", 7)    end    local resource = resources[seg_res]    if not resource then        core.response.exit(404)    end    local method = str_lower(get_method())    if not resource[method] then        core.response.exit(404)    end    local req_body, err = core.request.get_body(MAX_REQ_BODY)    if err then        core.log.error("failed to read request body: ", err)        core.response.exit(400, {error_msg = "invalid request body: " .. err})    end    if req_body then        local data, err = core.json.decode(req_body)        if not data then            core.log.error("invalid request body: ", req_body, " err: ", err)            core.response.exit(400, {error_msg = "invalid request body: " .. err,                                     req_body = req_body})        end        req_body = data    end    local uri_args = ngx.req.get_uri_args() or {}    if uri_args.ttl then        if not tonumber(uri_args.ttl) then            core.response.exit(400, {error_msg = "invalid argument ttl: "                                                 .. "should be a number"})        end    end    local code, data = resource[method](seg_id, req_body, seg_sub_path,                                        uri_args)    if code then        data = strip_etcd_resp(data)        core.response.exit(code, data)    endend

依据 resource 定义找到对应的 lua 模板的执行办法

local resources = {    routes          = require("apisix.admin.routes"),    services        = require("apisix.admin.services"),    upstreams       = require("apisix.admin.upstreams"),    consumers       = require("apisix.admin.consumers"),    schema          = require("apisix.admin.schema"),    ssl             = require("apisix.admin.ssl"),    plugins         = require("apisix.admin.plugins"),    proto           = require("apisix.admin.proto"),    global_rules    = require("apisix.admin.global_rules"),    stream_routes   = require("apisix.admin.stream_routes"),    plugin_metadata = require("apisix.admin.plugin_metadata"),    plugin_configs  = require("apisix.admin.plugin_config"),}

执行对应模块的对应的办法,例如:
GET http://127.0.0.1:9080/apisix/...
就是间接调用 plugin_metadata 的 get 办法

function _M.get(key)    local path = "/plugin_metadata"    if key then        path = path .. "/" .. key    end    local res, err = core.etcd.get(path, not key)    if not res then        core.log.error("failed to get metadata[", key, "]: ", err)        return 503, {error_msg = err}    end    return res.status, res.bodyend

control api

入口 apisix.http_control()

server {        listen 127.0.0.1:9090;        access_log off;        location / {            content_by_lua_block {                apisix.http_control()            }        }        location @50x.html {            set $from_error_page 'true';            content_by_lua_block {                require("apisix.error_handling").handle_500()            }        }    }

先注册所有的插件的 control_api 办法,调用 router:dispatch 进行路由散发

function _M.match(uri)    if cached_version ~= plugin_mod.load_times then        local err        router, err = fetch_control_api_router()        if router == nil then            core.log.error("failed to fetch valid api router: ", err)            return false        end        cached_version = plugin_mod.load_times    end    core.table.clear(match_opts)    match_opts.method = get_method()    return router:dispatch(uri, match_opts)end

例如:server_info 插件,注册门路 /v1/server_info 并指定应用 get_server_info函数进行解决

function _M.control_api()    return {        {            methods = {"GET"},            uris ={"/v1/server_info"},            handler = get_server_info,        }    }end

注册 plugin的 control_api 办法

    for _, plugin in ipairs(plugin_mod.plugins) do        local api_fun = plugin.control_api        if api_fun then            local api_route = api_fun()            register_api_routes(routes, api_route)        end    end

通过dispatch 办法调用插件的 handler 办法

   local route, err = match_route(self, path, opts or empty_table, args)    if not route then        if err then            return nil, err        end        return nil    end    local handler = route.handler    if not handler or type(handler) ~= "function" then        return nil, "missing handler"    end    handler(...)

手撸 apisix 地址 :
https://github.com/mousycoder...