关于apisix:apisix-最详细源码分析以及手撸一个-apisix

11次阅读

共计 17712 个字符,预计需要花费 45 分钟才能阅读完成。

启动

  • make run 调用 ./bin/apisix start
  • 寻找 juajit 门路 运行 /usr/local/Cellar/openresty/1.19.9.1_2/luajit/bin/luajit ./apisix/cli/apisix.lua start
  • 调用 ops.lua 里的 start 办法, 初始化配置,ETCD,执行 openresty 启动命令

    local function start(env, ...)
      init(env)
      init_etcd(env, args)
      util.execute_cmd(env.openresty_args)
    end
  • 初始化 nginx 配置,通过读取 conf/config.yaml 配合模板 ngx_tpl.lua 生成 nginx config 文件。供 openresty(nginx) 应用

     local conf_render = template.compile(ngx_tpl)
      local ngxconf = conf_render(sys_conf)
    
      local ok, err = util.write_file(env.apisix_home .. "/conf/nginx.conf",
                                      ngxconf)
      if not ok then
          util.die("failed to update nginx.conf:", err, "\n")
      end
  • 初始化 ETCD, 读取 ETCD 集群配置,进行连贯

    local version_url = host .. "/version"
          local errmsg
    
          local res, err
          local retry_time = 0
          while retry_time < 2 do
              res, err = request(version_url, yaml_conf)
              -- In case of failure, request returns nil followed by an error message.
              -- Else the first return value is the response body
              -- and followed by the response status code.
              if res then
                  break
              end
              retry_time = retry_time + 1
              print(str_format("Warning! Request etcd endpoint \'%s\'error, %s, retry time=%s",
                               version_url, err, retry_time))
          end
  • 执行 openresty 启动命令 openresty -p /usr/local/apisix -c /conf/nginx.conf

      local openresty_args = [[openresty -p]] .. apisix_home .. [[-c]]
                             .. apisix_home .. [[/conf/nginx.conf]]
    
      util.execute_cmd(env.openresty_args)

插件流程

nginx 配置中嵌入的 apisix 流程

 init_by_lua_block {
        require "resty.core"
        apisix = require("apisix")

        local dns_resolver = {"172.19.2.70", "172.19.2.62",}
        local args = {dns_resolver = dns_resolver,}
        apisix.http_init(args)
}
    
init_worker_by_lua_block {apisix.http_init_worker()
}

exit_worker_by_lua_block {apisix.http_exit_worker()
}

access_by_lua_block {apisix.http_access_phase()
}
header_filter_by_lua_block {apisix.http_header_filter_phase()
}
body_filter_by_lua_block {apisix.http_body_filter_phase()
}
log_by_lua_block {apisix.http_log_phase()
}

proxy_pass      $upstream_scheme://apisix_backend$upstream_uri;

upstream apisix_backend {
        server 0.0.0.1;

        balancer_by_lua_block {apisix.http_balancer_phase()
        }

        keepalive 320;
        keepalive_requests 1000;
        keepalive_timeout 60s;
}


  • apisix.http_init
    1. 设置 dns resolver
    2. 设置实例 id
    3. 启动 privileged agent

      core.resolver.init_resolver(args)
      core.id.init()
      local process = require("ngx.process")
      local ok, err = process.enable_privileged_agent()
  • apisix.http_init_worker

    function _M.http_init_worker()
      local seed, err = core.utils.get_seed_from_urandom()
      if not seed then
          core.log.warn('failed to get seed from urandom:', err)
          seed = ngx_now() * 1000 + ngx.worker.pid()
      end
      math.randomseed(seed)
      -- for testing only
      core.log.info("random test in [1, 10000]:", math.random(1, 10000))
    
      local we = require("resty.worker.events")
      local ok, err = we.configure({shm = "worker-events", interval = 0.1})
      if not ok then
          error("failed to init worker event:" .. err)
      end
      local discovery = require("apisix.discovery.init").discovery
      if discovery and discovery.init_worker then
          discovery.init_worker()
      end
      require("apisix.balancer").init_worker()
      load_balancer = require("apisix.balancer")
      require("apisix.admin.init").init_worker()
    
      require("apisix.timers").init_worker()
    
      plugin.init_worker()
      router.http_init_worker()
      require("apisix.http.service").init_worker()
      plugin_config.init_worker()
      require("apisix.consumer").init_worker()
    
      if core.config == require("apisix.core.config_yaml") then
          core.config.init_worker()
      end
    
      require("apisix.debug").init_worker()
      apisix_upstream.init_worker()
      require("apisix.plugins.ext-plugin.init").init_worker()
    
      local_conf = core.config.local_conf()
    
      if local_conf.apisix and local_conf.apisix.enable_server_tokens == false then
          ver_header = "APISIX"
      end
    end

1. 初始化 openresty worker event

    local we = require("resty.worker.events")
    local ok, err = we.configure({shm = "worker-events", interval = 0.1})
    if not ok then
        error("failed to init worker event:" .. err)
    end

2. 初始化服务发现
3. 初始化 balancer 组件
4. 初始化 admin 组件(会同步一次插件配置到 etcd)

     sync_local_conf_to_etcd(true)

5. 初始化 timers 组件
6. 初始化 plugin 组件 (清掉旧的 table, 而后把从 config-default.yaml 和 config.yaml 文件中读取插件配置放到一个 local_plugins_hash 中,并按优先级排序)

local local_conf_path = profile:yaml_path("config-default")
local default_conf_yaml, err = util.read_file(local_conf_path)
local_conf_path = profile:yaml_path("config")
local user_conf_yaml, err = util.read_file(local_conf_path)
 ok, err = merge_conf(default_conf, user_conf)


 local local_plugins = core.table.new(32, 0)

  for name in pairs(local_plugins_hash) do
        unload_plugin(name)
    end

    core.table.clear(local_plugins)
    core.table.clear(local_plugins_hash)

    for name in pairs(processed) do
        load_plugin(name, local_plugins)
    end

    -- sort by plugin's priority
    if #local_plugins > 1 then
        sort_tab(local_plugins, sort_plugin)
    end

    local plugin_metadatas, err = core.config.new("/plugin_metadata",
        {automatic = true}
    )

7. 初始化 router 组件(初始化 etcd /global_rules 数据)

    local global_rules, err = core.config.new("/global_rules", {
            automatic = true,
            item_schema = core.schema.global_rule,
            checker = plugin_checker,
        })

8. 初始化 servers 组件 (从 ETCD 抓取 services 配置)

services, err = core.config.new("/services", {
        automatic = true,
        item_schema = core.schema.service,
        checker = plugin_checker,
        filter = filter,
    })

9. 初始化 sonsumer 组件
10. 同步 config_yaml 到各个过程

    -- sync data in each non-master process
    ngx.timer.every(1, read_apisix_yaml)

11. 初始化 upstream 组件
12. 初始化 ext-plugin 组件

  • apisix.http_exit_worker()
    进行 “privileged agent”
  • apisix.http_access_phase()

    function _M.http_access_phase()
      local ngx_ctx = ngx.ctx
    
      if not verify_tls_client(ngx_ctx.api_ctx) then
          return core.response.exit(400)
      end
    
      -- always fetch table from the table pool, we don't need a reused api_ctx
      local api_ctx = core.tablepool.fetch("api_ctx", 0, 32)
      ngx_ctx.api_ctx = api_ctx
    
      core.ctx.set_vars_meta(api_ctx)
    
      local uri = api_ctx.var.uri
      if local_conf.apisix and local_conf.apisix.delete_uri_tail_slash then
          if str_byte(uri, #uri) == str_byte("/") then
              api_ctx.var.uri = str_sub(api_ctx.var.uri, 1, #uri - 1)
              core.log.info("remove the end of uri'/', current uri:",
                            api_ctx.var.uri)
          end
      end
    
      if router.api.has_route_not_under_apisix() or
          core.string.has_prefix(uri, "/apisix/")
      then
          local skip = local_conf and local_conf.apisix.global_rule_skip_internal_api
          local matched = router.api.match(api_ctx, skip)
          if matched then
              return
          end
      end
    
      router.router_http.match(api_ctx)
    
      local route = api_ctx.matched_route
      if not route then
          -- run global rule
          plugin.run_global_rules(api_ctx, router.global_rules, nil)
    
          core.log.info("not find any matched route")
          return core.response.exit(404,
                      {error_msg = "404 Route Not Found"})
      end
    
      core.log.info("matched route:",
                    core.json.delay_encode(api_ctx.matched_route, true))
    
      local enable_websocket = route.value.enable_websocket
    
      if route.value.plugin_config_id then
          local conf = plugin_config.get(route.value.plugin_config_id)
          if not conf then
              core.log.error("failed to fetch plugin config by",
                              "id:", route.value.plugin_config_id)
              return core.response.exit(503)
          end
    
          route = plugin_config.merge(route, conf)
      end
    
      if route.value.service_id then
          local service = service_fetch(route.value.service_id)
          if not service then
              core.log.error("failed to fetch service configuration by",
                             "id:", route.value.service_id)
              return core.response.exit(404)
          end
    
          route = plugin.merge_service_route(service, route)
          api_ctx.matched_route = route
          api_ctx.conf_type = "route&service"
          api_ctx.conf_version = route.modifiedIndex .. "&" .. service.modifiedIndex
          api_ctx.conf_id = route.value.id .. "&" .. service.value.id
          api_ctx.service_id = service.value.id
          api_ctx.service_name = service.value.name
    
          if enable_websocket == nil then
              enable_websocket = service.value.enable_websocket
          end
    
      else
          api_ctx.conf_type = "route"
          api_ctx.conf_version = route.modifiedIndex
          api_ctx.conf_id = route.value.id
      end
      api_ctx.route_id = route.value.id
      api_ctx.route_name = route.value.name
    
      -- run global rule
      plugin.run_global_rules(api_ctx, router.global_rules, nil)
    
      if route.value.script then
          script.load(route, api_ctx)
          script.run("access", api_ctx)
    
      else
          local plugins = plugin.filter(route)
          api_ctx.plugins = plugins
    
          plugin.run_plugin("rewrite", plugins, api_ctx)
          if api_ctx.consumer then
              local changed
              route, changed = plugin.merge_consumer_route(
                  route,
                  api_ctx.consumer,
                  api_ctx
              )
    
              core.log.info("find consumer", api_ctx.consumer.username,
                            ", config changed:", changed)
    
              if changed then
                  api_ctx.matched_route = route
                  core.table.clear(api_ctx.plugins)
                  api_ctx.plugins = plugin.filter(route, api_ctx.plugins)
              end
          end
          plugin.run_plugin("access", plugins, api_ctx)
      end
    
      local up_id = route.value.upstream_id
    
      -- used for the traffic-split plugin
      if api_ctx.upstream_id then
          up_id = api_ctx.upstream_id
      end
    
      if up_id then
          local upstream = get_upstream_by_id(up_id)
          api_ctx.matched_upstream = upstream
    
      else
          if route.has_domain then
              local err
              route, err = parse_domain_in_route(route)
              if err then
                  core.log.error("failed to get resolved route:", err)
                  return core.response.exit(500)
              end
    
              api_ctx.conf_version = route.modifiedIndex
              api_ctx.matched_route = route
          end
    
          local route_val = route.value
          if route_val.upstream and route_val.upstream.enable_websocket then
              enable_websocket = true
          end
    
          api_ctx.matched_upstream = (route.dns_value and
                                      route.dns_value.upstream)
                                     or route_val.upstream
      end
    
      if enable_websocket then
          api_ctx.var.upstream_upgrade    = api_ctx.var.http_upgrade
          api_ctx.var.upstream_connection = api_ctx.var.http_connection
          core.log.info("enabled websocket for route:", route.value.id)
      end
    
      if route.value.service_protocol == "grpc" then
          api_ctx.upstream_scheme = "grpc"
      end
    
      local code, err = set_upstream(route, api_ctx)
      if code then
          core.log.error("failed to set upstream:", err)
          core.response.exit(code)
      end
    
      local server, err = load_balancer.pick_server(route, api_ctx)
      if not server then
          core.log.error("failed to pick server:", err)
          return core.response.exit(502)
      end
    
      api_ctx.picked_server = server
    
      set_upstream_headers(api_ctx, server)
    
      -- run the before_proxy method in access phase first to avoid always reinit request
      common_phase("before_proxy")
    
      local ref = ctxdump.stash_ngx_ctx()
      core.log.info("stash ngx ctx:", ref)
      ngx_var.ctx_ref = ref
    
      local up_scheme = api_ctx.upstream_scheme
      if up_scheme == "grpcs" or up_scheme == "grpc" then
          return ngx.exec("@grpc_pass")
      end
    
      if api_ctx.dubbo_proxy_enabled then
          return ngx.exec("@dubbo_pass")
      end
    end

    1. 初始化 api_ctx 上下文
    2.client tls 验证
    3. 是否为 apisix 曾经注册的门路,对申请进行匹配,外部应用 Radix tree 进行匹配
    4. 向上下文注入该申请匹配的 apisix route,service 等信息用于后续阶段应用
    5. 向上下文注入该申请相干的插件,例如:申请对应的路由存在插件,若申请存在对应的 service 则退出 service 定义的插件,以及全局插件
    6. 调用 “rewrite” 阶段的插件
    7. 调用 “access” 阶段的插件
    8. 获取 upstream
    9. 执行 loadbalancer 抉择 server
    10. 调用 “balancer” 阶段插件
    11. 判断 upstream,依据 upstream 类型,grpc dubbo 等进入 @grpc_pass @dubbo_pass 等不同的后续解决流程。这些配置可在 nginx.conf 中查看

  • apisix.http_balancer_phase()

    function _M.http_header_filter_phase()
      if ngx_var.ctx_ref ~= '' then
          -- prevent for the table leak
          local stash_ctx = fetch_ctx()
    
          -- internal redirect, so we should apply the ctx
          if ngx_var.from_error_page == "true" then
              ngx.ctx = stash_ctx
          end
      end
    
      core.response.set_header("Server", ver_header)
    
      local up_status = get_var("upstream_status")
      if up_status and #up_status == 3
         and tonumber(up_status) >= 500
         and tonumber(up_status) <= 599
      then
          set_resp_upstream_status(up_status)
      elseif up_status and #up_status > 3 then
          -- the up_status can be "502, 502" or "502, 502 :"
          local last_status
          if str_byte(up_status, -1) == str_byte(" ") then
              last_status = str_sub(up_status, -6, -3)
          else
              last_status = str_sub(up_status, -3)
          end
    
          if tonumber(last_status) >= 500 and tonumber(last_status) <= 599 then
              set_resp_upstream_status(up_status)
          end
      end
    
      common_phase("header_filter")
    end

1. 设置头 “Server”, APISIX
2. 设置上游状态头:X-APISIX-Upstream-Status
3. 执行“header_filter”阶段的插件

  • apisix.http_body_filter_phase()

    function _M.http_body_filter_phase()
      common_phase("body_filter")
    end

    执行“body_filter”阶段的插件

  • apisix.http_log_phase()

    function _M.http_log_phase()
      if ngx_var.ctx_ref ~= '' then
          -- prevent for the table leak
          local stash_ctx = fetch_ctx()
    
          -- internal redirect, so we should apply the ctx
          if ngx_var.from_error_page == "true" then
              ngx.ctx = stash_ctx
          end
      end
    
      local api_ctx = common_phase("log")
      if not api_ctx then
          return
      end
    
      healthcheck_passive(api_ctx)
    
      if api_ctx.server_picker and api_ctx.server_picker.after_balance then
          api_ctx.server_picker.after_balance(api_ctx, false)
      end
    
      if api_ctx.uri_parse_param then
          core.tablepool.release("uri_parse_param", api_ctx.uri_parse_param)
      end
    
      core.ctx.release_vars(api_ctx)
      if api_ctx.plugins then
          core.tablepool.release("plugins", api_ctx.plugins)
      end
    
      if api_ctx.curr_req_matched then
          core.tablepool.release("matched_route_record", api_ctx.curr_req_matched)
      end
    
      core.tablepool.release("api_ctx", api_ctx)
    end

    1. 执行“log”阶段的插件
    2. 回收 plugins, matched_route_record,api_ctx 缓存

  • apisix.http_balancer_phase()

    function _M.http_balancer_phase()
      local api_ctx = ngx.ctx.api_ctx
      if not api_ctx then
          core.log.error("invalid api_ctx")
          return core.response.exit(500)
      end
    
      load_balancer.run(api_ctx.matched_route, api_ctx, common_phase)
    end

    1. 设置 balance 的基本参数(超时工夫,连贯失败重试次数)
    2. 抉择上游服务器, 采纳一致性 hash 算法
    3. 调用 set_current_peer 设置 proxy_pass

自定义插件逻辑





通过 common_phase 作为自定义插件的办法的公共入口,被 openresty 各个环节调用

local function common_phase(phase_name)
    local api_ctx = ngx.ctx.api_ctx
    if not api_ctx then
        return
    end

    plugin.run_global_rules(api_ctx, api_ctx.global_rules, phase_name)

    if api_ctx.script_obj then
        script.run(phase_name, api_ctx)
        return api_ctx, true
    end

    return plugin.run_plugin(phase_name, nil, api_ctx)
end

真正执行自定义插件逻辑

 for i = 1, #plugins, 2 do
            local phase_func = plugins[i][phase]
            if phase_func then
                plugin_run = true
                local code, body = phase_func(plugins[i + 1], api_ctx)
                if code or body then
                    if is_http then
                        if code >= 400 then
                            core.log.warn(plugins[i].name, "exits with http status code", code)
                        end

                        core.response.exit(code, body)
                    else
                        if code >= 400 then
                            core.log.warn(plugins[i].name, "exits with status code", code)
                        end

                        ngx_exit(1)
                    end
                end
            end
        end

admin api

        location /apisix/admin {
            set $upstream_scheme             'http';
            set $upstream_host               $http_host;
            set $upstream_uri                '';

                allow 127.0.0.0/24;
                deny all;

            content_by_lua_block {apisix.http_admin()
            }
        }

自定义插件

入口

local function common_phase(phase_name)
    local api_ctx = ngx.ctx.api_ctx
    if not api_ctx then
        return
    end

    plugin.run_global_rules(api_ctx, api_ctx.global_rules, phase_name)

    if api_ctx.script_obj then
        script.run(phase_name, api_ctx)
        return api_ctx, true
    end

    return plugin.run_plugin(phase_name, nil, api_ctx)
end

已知的阶段
preread
ssl
access
balancer
rewrite
header_filter
body_filter
log

admin api

入口

        location /apisix/admin {
            set $upstream_scheme             'http';
            set $upstream_host               $http_host;
            set $upstream_uri                '';

                allow 127.0.0.0/24;
                deny all;

            content_by_lua_block {apisix.http_admin()
            }
        }

依据路由进行转发

    local ok = router:dispatch(get_var("uri"), {method = get_method()})

具体路由 dispatch 逻辑

function _M.dispatch(self, path, opts, ...)
    if type(path) ~= "string" then
        error("invalid argument path", 2)
    end

    local args
    local len = select('#', ...)
    if len > 0 then
        if not self.args then
            self.args = {...}
        else
            clear_tab(self.args)
            for i = 1, len do
                self.args[i] = select(i, ...)
            end
        end

        -- To keep the self.args in safe,
        -- we can't yield until filter_fun is called
        args = self.args
        args[0] = len
    end

    local route, err = match_route(self, path, opts or empty_table, args)
    if not route then
        if err then
            return nil, err
        end
        return nil
    end

    local handler = route.handler
    if not handler or type(handler) ~= "function" then
        return nil, "missing handler"
    end

    handler(...)
    return true
end

路由与 hanlder 的关系

local uri_route = {
    {paths = [[/apisix/admin/*]],
        methods = {"GET", "PUT", "POST", "DELETE", "PATCH"},
        handler = run,
    },
    {paths = [[/apisix/admin/stream_routes/*]],
        methods = {"GET", "PUT", "POST", "DELETE", "PATCH"},
        handler = run_stream,
    },
    {paths = [[/apisix/admin/plugins/list]],
        methods = {"GET"},
        handler = get_plugins_list,
    },
    {
        paths = reload_event,
        methods = {"PUT"},
        handler = post_reload_plugins,
    },
}

调用 run 办法


local function run()
    local api_ctx = {}
    core.ctx.set_vars_meta(api_ctx)
    ngx.ctx.api_ctx = api_ctx

    local ok, err = check_token(api_ctx)
    if not ok then
        core.log.warn("failed to check token:", err)
        core.response.exit(401)
    end

    local uri_segs = core.utils.split_uri(ngx.var.uri)
    core.log.info("uri:", core.json.delay_encode(uri_segs))

    -- /apisix/admin/schema/route
    local seg_res, seg_id = uri_segs[4], uri_segs[5]
    local seg_sub_path = core.table.concat(uri_segs, "/", 6)
    if seg_res == "schema" and seg_id == "plugins" then
        -- /apisix/admin/schema/plugins/limit-count
        seg_res, seg_id = uri_segs[5], uri_segs[6]
        seg_sub_path = core.table.concat(uri_segs, "/", 7)
    end

    local resource = resources[seg_res]
    if not resource then
        core.response.exit(404)
    end

    local method = str_lower(get_method())
    if not resource[method] then
        core.response.exit(404)
    end

    local req_body, err = core.request.get_body(MAX_REQ_BODY)
    if err then
        core.log.error("failed to read request body:", err)
        core.response.exit(400, {error_msg = "invalid request body:" .. err})
    end

    if req_body then
        local data, err = core.json.decode(req_body)
        if not data then
            core.log.error("invalid request body:", req_body, "err:", err)
            core.response.exit(400, {error_msg = "invalid request body:" .. err,
                                     req_body = req_body})
        end

        req_body = data
    end

    local uri_args = ngx.req.get_uri_args() or {}
    if uri_args.ttl then
        if not tonumber(uri_args.ttl) then
            core.response.exit(400, {error_msg = "invalid argument ttl:"
                                                 .. "should be a number"})
        end
    end

    local code, data = resource[method](seg_id, req_body, seg_sub_path,
                                        uri_args)
    if code then
        data = strip_etcd_resp(data)
        core.response.exit(code, data)
    end
end

依据 resource 定义找到对应的 lua 模板的执行办法

local resources = {routes          = require("apisix.admin.routes"),
    services        = require("apisix.admin.services"),
    upstreams       = require("apisix.admin.upstreams"),
    consumers       = require("apisix.admin.consumers"),
    schema          = require("apisix.admin.schema"),
    ssl             = require("apisix.admin.ssl"),
    plugins         = require("apisix.admin.plugins"),
    proto           = require("apisix.admin.proto"),
    global_rules    = require("apisix.admin.global_rules"),
    stream_routes   = require("apisix.admin.stream_routes"),
    plugin_metadata = require("apisix.admin.plugin_metadata"),
    plugin_configs  = require("apisix.admin.plugin_config"),
}

执行对应模块的对应的办法,例如:
GET http://127.0.0.1:9080/apisix/…
就是间接调用 plugin_metadata 的 get 办法

function _M.get(key)

    local path = "/plugin_metadata"
    if key then
        path = path .. "/" .. key
    end

    local res, err = core.etcd.get(path, not key)
    if not res then
        core.log.error("failed to get metadata[", key, "]:", err)
        return 503, {error_msg = err}
    end

    return res.status, res.body
end

control api

入口 apisix.http_control()

server {
        listen 127.0.0.1:9090;

        access_log off;

        location / {
            content_by_lua_block {apisix.http_control()
            }
        }

        location @50x.html {
            set $from_error_page 'true';
            content_by_lua_block {require("apisix.error_handling").handle_500()}
        }
    }

先注册所有的插件的 control_api 办法,调用 router:dispatch 进行路由散发

function _M.match(uri)
    if cached_version ~= plugin_mod.load_times then
        local err
        router, err = fetch_control_api_router()
        if router == nil then
            core.log.error("failed to fetch valid api router:", err)
            return false
        end

        cached_version = plugin_mod.load_times
    end

    core.table.clear(match_opts)
    match_opts.method = get_method()

    return router:dispatch(uri, match_opts)
end

例如:server_info 插件,注册门路 /v1/server_info 并指定应用 get_server_info 函数进行解决

function _M.control_api()
    return {
        {methods = {"GET"},
            uris ={"/v1/server_info"},
            handler = get_server_info,
        }
    }
end

注册 plugin 的 control_api 办法

    for _, plugin in ipairs(plugin_mod.plugins) do
        local api_fun = plugin.control_api
        if api_fun then
            local api_route = api_fun()
            register_api_routes(routes, api_route)
        end
    end

通过 dispatch 办法调用插件的 handler 办法

   local route, err = match_route(self, path, opts or empty_table, args)
    if not route then
        if err then
            return nil, err
        end
        return nil
    end

    local handler = route.handler
    if not handler or type(handler) ~= "function" then
        return nil, "missing handler"
    end

    handler(...)

手撸 apisix 地址:
https://github.com/mousycoder…

正文完
 0