共计 17712 个字符,预计需要花费 45 分钟才能阅读完成。
启动
- make run 调用 ./bin/apisix start
- 寻找 juajit 门路 运行 /usr/local/Cellar/openresty/1.19.9.1_2/luajit/bin/luajit ./apisix/cli/apisix.lua start
-
调用 ops.lua 里的 start 办法, 初始化配置,ETCD,执行 openresty 启动命令
local function start(env, ...) init(env) init_etcd(env, args) util.execute_cmd(env.openresty_args) end
-
初始化 nginx 配置,通过读取 conf/config.yaml 配合模板 ngx_tpl.lua 生成 nginx config 文件。供 openresty(nginx) 应用
local conf_render = template.compile(ngx_tpl) local ngxconf = conf_render(sys_conf) local ok, err = util.write_file(env.apisix_home .. "/conf/nginx.conf", ngxconf) if not ok then util.die("failed to update nginx.conf:", err, "\n") end
-
初始化 ETCD, 读取 ETCD 集群配置,进行连贯
local version_url = host .. "/version" local errmsg local res, err local retry_time = 0 while retry_time < 2 do res, err = request(version_url, yaml_conf) -- In case of failure, request returns nil followed by an error message. -- Else the first return value is the response body -- and followed by the response status code. if res then break end retry_time = retry_time + 1 print(str_format("Warning! Request etcd endpoint \'%s\'error, %s, retry time=%s", version_url, err, retry_time)) end
-
执行 openresty 启动命令 openresty -p /usr/local/apisix -c /conf/nginx.conf
local openresty_args = [[openresty -p]] .. apisix_home .. [[-c]] .. apisix_home .. [[/conf/nginx.conf]] util.execute_cmd(env.openresty_args)
插件流程
nginx 配置中嵌入的 apisix 流程
init_by_lua_block {
require "resty.core"
apisix = require("apisix")
local dns_resolver = {"172.19.2.70", "172.19.2.62",}
local args = {dns_resolver = dns_resolver,}
apisix.http_init(args)
}
init_worker_by_lua_block {apisix.http_init_worker()
}
exit_worker_by_lua_block {apisix.http_exit_worker()
}
access_by_lua_block {apisix.http_access_phase()
}
header_filter_by_lua_block {apisix.http_header_filter_phase()
}
body_filter_by_lua_block {apisix.http_body_filter_phase()
}
log_by_lua_block {apisix.http_log_phase()
}
proxy_pass $upstream_scheme://apisix_backend$upstream_uri;
upstream apisix_backend {
server 0.0.0.1;
balancer_by_lua_block {apisix.http_balancer_phase()
}
keepalive 320;
keepalive_requests 1000;
keepalive_timeout 60s;
}
-
apisix.http_init
1. 设置 dns resolver
2. 设置实例 id
3. 启动 privileged agentcore.resolver.init_resolver(args) core.id.init() local process = require("ngx.process") local ok, err = process.enable_privileged_agent()
-
apisix.http_init_worker
function _M.http_init_worker() local seed, err = core.utils.get_seed_from_urandom() if not seed then core.log.warn('failed to get seed from urandom:', err) seed = ngx_now() * 1000 + ngx.worker.pid() end math.randomseed(seed) -- for testing only core.log.info("random test in [1, 10000]:", math.random(1, 10000)) local we = require("resty.worker.events") local ok, err = we.configure({shm = "worker-events", interval = 0.1}) if not ok then error("failed to init worker event:" .. err) end local discovery = require("apisix.discovery.init").discovery if discovery and discovery.init_worker then discovery.init_worker() end require("apisix.balancer").init_worker() load_balancer = require("apisix.balancer") require("apisix.admin.init").init_worker() require("apisix.timers").init_worker() plugin.init_worker() router.http_init_worker() require("apisix.http.service").init_worker() plugin_config.init_worker() require("apisix.consumer").init_worker() if core.config == require("apisix.core.config_yaml") then core.config.init_worker() end require("apisix.debug").init_worker() apisix_upstream.init_worker() require("apisix.plugins.ext-plugin.init").init_worker() local_conf = core.config.local_conf() if local_conf.apisix and local_conf.apisix.enable_server_tokens == false then ver_header = "APISIX" end end
1. 初始化 openresty worker event
local we = require("resty.worker.events")
local ok, err = we.configure({shm = "worker-events", interval = 0.1})
if not ok then
error("failed to init worker event:" .. err)
end
2. 初始化服务发现
3. 初始化 balancer 组件
4. 初始化 admin 组件(会同步一次插件配置到 etcd)
sync_local_conf_to_etcd(true)
5. 初始化 timers 组件
6. 初始化 plugin 组件 (清掉旧的 table, 而后把从 config-default.yaml 和 config.yaml 文件中读取插件配置放到一个 local_plugins_hash 中,并按优先级排序)
local local_conf_path = profile:yaml_path("config-default")
local default_conf_yaml, err = util.read_file(local_conf_path)
local_conf_path = profile:yaml_path("config")
local user_conf_yaml, err = util.read_file(local_conf_path)
ok, err = merge_conf(default_conf, user_conf)
local local_plugins = core.table.new(32, 0)
for name in pairs(local_plugins_hash) do
unload_plugin(name)
end
core.table.clear(local_plugins)
core.table.clear(local_plugins_hash)
for name in pairs(processed) do
load_plugin(name, local_plugins)
end
-- sort by plugin's priority
if #local_plugins > 1 then
sort_tab(local_plugins, sort_plugin)
end
local plugin_metadatas, err = core.config.new("/plugin_metadata",
{automatic = true}
)
7. 初始化 router 组件(初始化 etcd /global_rules 数据)
local global_rules, err = core.config.new("/global_rules", {
automatic = true,
item_schema = core.schema.global_rule,
checker = plugin_checker,
})
8. 初始化 servers 组件 (从 ETCD 抓取 services 配置)
services, err = core.config.new("/services", {
automatic = true,
item_schema = core.schema.service,
checker = plugin_checker,
filter = filter,
})
9. 初始化 sonsumer 组件
10. 同步 config_yaml 到各个过程
-- sync data in each non-master process
ngx.timer.every(1, read_apisix_yaml)
11. 初始化 upstream 组件
12. 初始化 ext-plugin 组件
- apisix.http_exit_worker()
进行 “privileged agent” -
apisix.http_access_phase()
function _M.http_access_phase() local ngx_ctx = ngx.ctx if not verify_tls_client(ngx_ctx.api_ctx) then return core.response.exit(400) end -- always fetch table from the table pool, we don't need a reused api_ctx local api_ctx = core.tablepool.fetch("api_ctx", 0, 32) ngx_ctx.api_ctx = api_ctx core.ctx.set_vars_meta(api_ctx) local uri = api_ctx.var.uri if local_conf.apisix and local_conf.apisix.delete_uri_tail_slash then if str_byte(uri, #uri) == str_byte("/") then api_ctx.var.uri = str_sub(api_ctx.var.uri, 1, #uri - 1) core.log.info("remove the end of uri'/', current uri:", api_ctx.var.uri) end end if router.api.has_route_not_under_apisix() or core.string.has_prefix(uri, "/apisix/") then local skip = local_conf and local_conf.apisix.global_rule_skip_internal_api local matched = router.api.match(api_ctx, skip) if matched then return end end router.router_http.match(api_ctx) local route = api_ctx.matched_route if not route then -- run global rule plugin.run_global_rules(api_ctx, router.global_rules, nil) core.log.info("not find any matched route") return core.response.exit(404, {error_msg = "404 Route Not Found"}) end core.log.info("matched route:", core.json.delay_encode(api_ctx.matched_route, true)) local enable_websocket = route.value.enable_websocket if route.value.plugin_config_id then local conf = plugin_config.get(route.value.plugin_config_id) if not conf then core.log.error("failed to fetch plugin config by", "id:", route.value.plugin_config_id) return core.response.exit(503) end route = plugin_config.merge(route, conf) end if route.value.service_id then local service = service_fetch(route.value.service_id) if not service then core.log.error("failed to fetch service configuration by", "id:", route.value.service_id) return core.response.exit(404) end route = plugin.merge_service_route(service, route) api_ctx.matched_route = route api_ctx.conf_type = "route&service" api_ctx.conf_version = route.modifiedIndex .. "&" .. service.modifiedIndex api_ctx.conf_id = route.value.id .. "&" .. service.value.id api_ctx.service_id = service.value.id api_ctx.service_name = service.value.name if enable_websocket == nil then enable_websocket = service.value.enable_websocket end else api_ctx.conf_type = "route" api_ctx.conf_version = route.modifiedIndex api_ctx.conf_id = route.value.id end api_ctx.route_id = route.value.id api_ctx.route_name = route.value.name -- run global rule plugin.run_global_rules(api_ctx, router.global_rules, nil) if route.value.script then script.load(route, api_ctx) script.run("access", api_ctx) else local plugins = plugin.filter(route) api_ctx.plugins = plugins plugin.run_plugin("rewrite", plugins, api_ctx) if api_ctx.consumer then local changed route, changed = plugin.merge_consumer_route( route, api_ctx.consumer, api_ctx ) core.log.info("find consumer", api_ctx.consumer.username, ", config changed:", changed) if changed then api_ctx.matched_route = route core.table.clear(api_ctx.plugins) api_ctx.plugins = plugin.filter(route, api_ctx.plugins) end end plugin.run_plugin("access", plugins, api_ctx) end local up_id = route.value.upstream_id -- used for the traffic-split plugin if api_ctx.upstream_id then up_id = api_ctx.upstream_id end if up_id then local upstream = get_upstream_by_id(up_id) api_ctx.matched_upstream = upstream else if route.has_domain then local err route, err = parse_domain_in_route(route) if err then core.log.error("failed to get resolved route:", err) return core.response.exit(500) end api_ctx.conf_version = route.modifiedIndex api_ctx.matched_route = route end local route_val = route.value if route_val.upstream and route_val.upstream.enable_websocket then enable_websocket = true end api_ctx.matched_upstream = (route.dns_value and route.dns_value.upstream) or route_val.upstream end if enable_websocket then api_ctx.var.upstream_upgrade = api_ctx.var.http_upgrade api_ctx.var.upstream_connection = api_ctx.var.http_connection core.log.info("enabled websocket for route:", route.value.id) end if route.value.service_protocol == "grpc" then api_ctx.upstream_scheme = "grpc" end local code, err = set_upstream(route, api_ctx) if code then core.log.error("failed to set upstream:", err) core.response.exit(code) end local server, err = load_balancer.pick_server(route, api_ctx) if not server then core.log.error("failed to pick server:", err) return core.response.exit(502) end api_ctx.picked_server = server set_upstream_headers(api_ctx, server) -- run the before_proxy method in access phase first to avoid always reinit request common_phase("before_proxy") local ref = ctxdump.stash_ngx_ctx() core.log.info("stash ngx ctx:", ref) ngx_var.ctx_ref = ref local up_scheme = api_ctx.upstream_scheme if up_scheme == "grpcs" or up_scheme == "grpc" then return ngx.exec("@grpc_pass") end if api_ctx.dubbo_proxy_enabled then return ngx.exec("@dubbo_pass") end end
1. 初始化 api_ctx 上下文
2.client tls 验证
3. 是否为 apisix 曾经注册的门路,对申请进行匹配,外部应用 Radix tree 进行匹配
4. 向上下文注入该申请匹配的 apisix route,service 等信息用于后续阶段应用
5. 向上下文注入该申请相干的插件,例如:申请对应的路由存在插件,若申请存在对应的 service 则退出 service 定义的插件,以及全局插件
6. 调用 “rewrite” 阶段的插件
7. 调用 “access” 阶段的插件
8. 获取 upstream
9. 执行 loadbalancer 抉择 server
10. 调用 “balancer” 阶段插件
11. 判断 upstream,依据 upstream 类型,grpc dubbo 等进入 @grpc_pass @dubbo_pass 等不同的后续解决流程。这些配置可在 nginx.conf 中查看 -
apisix.http_balancer_phase()
function _M.http_header_filter_phase() if ngx_var.ctx_ref ~= '' then -- prevent for the table leak local stash_ctx = fetch_ctx() -- internal redirect, so we should apply the ctx if ngx_var.from_error_page == "true" then ngx.ctx = stash_ctx end end core.response.set_header("Server", ver_header) local up_status = get_var("upstream_status") if up_status and #up_status == 3 and tonumber(up_status) >= 500 and tonumber(up_status) <= 599 then set_resp_upstream_status(up_status) elseif up_status and #up_status > 3 then -- the up_status can be "502, 502" or "502, 502 :" local last_status if str_byte(up_status, -1) == str_byte(" ") then last_status = str_sub(up_status, -6, -3) else last_status = str_sub(up_status, -3) end if tonumber(last_status) >= 500 and tonumber(last_status) <= 599 then set_resp_upstream_status(up_status) end end common_phase("header_filter") end
1. 设置头 “Server”, APISIX
2. 设置上游状态头:X-APISIX-Upstream-Status
3. 执行“header_filter”阶段的插件
-
apisix.http_body_filter_phase()
function _M.http_body_filter_phase() common_phase("body_filter") end
执行“body_filter”阶段的插件
-
apisix.http_log_phase()
function _M.http_log_phase() if ngx_var.ctx_ref ~= '' then -- prevent for the table leak local stash_ctx = fetch_ctx() -- internal redirect, so we should apply the ctx if ngx_var.from_error_page == "true" then ngx.ctx = stash_ctx end end local api_ctx = common_phase("log") if not api_ctx then return end healthcheck_passive(api_ctx) if api_ctx.server_picker and api_ctx.server_picker.after_balance then api_ctx.server_picker.after_balance(api_ctx, false) end if api_ctx.uri_parse_param then core.tablepool.release("uri_parse_param", api_ctx.uri_parse_param) end core.ctx.release_vars(api_ctx) if api_ctx.plugins then core.tablepool.release("plugins", api_ctx.plugins) end if api_ctx.curr_req_matched then core.tablepool.release("matched_route_record", api_ctx.curr_req_matched) end core.tablepool.release("api_ctx", api_ctx) end
1. 执行“log”阶段的插件
2. 回收 plugins, matched_route_record,api_ctx 缓存 -
apisix.http_balancer_phase()
function _M.http_balancer_phase() local api_ctx = ngx.ctx.api_ctx if not api_ctx then core.log.error("invalid api_ctx") return core.response.exit(500) end load_balancer.run(api_ctx.matched_route, api_ctx, common_phase) end
1. 设置 balance 的基本参数(超时工夫,连贯失败重试次数)
2. 抉择上游服务器, 采纳一致性 hash 算法
3. 调用 set_current_peer 设置 proxy_pass
自定义插件逻辑
通过 common_phase 作为自定义插件的办法的公共入口,被 openresty 各个环节调用
local function common_phase(phase_name)
local api_ctx = ngx.ctx.api_ctx
if not api_ctx then
return
end
plugin.run_global_rules(api_ctx, api_ctx.global_rules, phase_name)
if api_ctx.script_obj then
script.run(phase_name, api_ctx)
return api_ctx, true
end
return plugin.run_plugin(phase_name, nil, api_ctx)
end
真正执行自定义插件逻辑
for i = 1, #plugins, 2 do
local phase_func = plugins[i][phase]
if phase_func then
plugin_run = true
local code, body = phase_func(plugins[i + 1], api_ctx)
if code or body then
if is_http then
if code >= 400 then
core.log.warn(plugins[i].name, "exits with http status code", code)
end
core.response.exit(code, body)
else
if code >= 400 then
core.log.warn(plugins[i].name, "exits with status code", code)
end
ngx_exit(1)
end
end
end
end
admin api
location /apisix/admin {
set $upstream_scheme 'http';
set $upstream_host $http_host;
set $upstream_uri '';
allow 127.0.0.0/24;
deny all;
content_by_lua_block {apisix.http_admin()
}
}
自定义插件
入口
local function common_phase(phase_name)
local api_ctx = ngx.ctx.api_ctx
if not api_ctx then
return
end
plugin.run_global_rules(api_ctx, api_ctx.global_rules, phase_name)
if api_ctx.script_obj then
script.run(phase_name, api_ctx)
return api_ctx, true
end
return plugin.run_plugin(phase_name, nil, api_ctx)
end
已知的阶段
preread
ssl
access
balancer
rewrite
header_filter
body_filter
log
admin api
入口
location /apisix/admin {
set $upstream_scheme 'http';
set $upstream_host $http_host;
set $upstream_uri '';
allow 127.0.0.0/24;
deny all;
content_by_lua_block {apisix.http_admin()
}
}
依据路由进行转发
local ok = router:dispatch(get_var("uri"), {method = get_method()})
具体路由 dispatch 逻辑
function _M.dispatch(self, path, opts, ...)
if type(path) ~= "string" then
error("invalid argument path", 2)
end
local args
local len = select('#', ...)
if len > 0 then
if not self.args then
self.args = {...}
else
clear_tab(self.args)
for i = 1, len do
self.args[i] = select(i, ...)
end
end
-- To keep the self.args in safe,
-- we can't yield until filter_fun is called
args = self.args
args[0] = len
end
local route, err = match_route(self, path, opts or empty_table, args)
if not route then
if err then
return nil, err
end
return nil
end
local handler = route.handler
if not handler or type(handler) ~= "function" then
return nil, "missing handler"
end
handler(...)
return true
end
路由与 hanlder 的关系
local uri_route = {
{paths = [[/apisix/admin/*]],
methods = {"GET", "PUT", "POST", "DELETE", "PATCH"},
handler = run,
},
{paths = [[/apisix/admin/stream_routes/*]],
methods = {"GET", "PUT", "POST", "DELETE", "PATCH"},
handler = run_stream,
},
{paths = [[/apisix/admin/plugins/list]],
methods = {"GET"},
handler = get_plugins_list,
},
{
paths = reload_event,
methods = {"PUT"},
handler = post_reload_plugins,
},
}
调用 run 办法
local function run()
local api_ctx = {}
core.ctx.set_vars_meta(api_ctx)
ngx.ctx.api_ctx = api_ctx
local ok, err = check_token(api_ctx)
if not ok then
core.log.warn("failed to check token:", err)
core.response.exit(401)
end
local uri_segs = core.utils.split_uri(ngx.var.uri)
core.log.info("uri:", core.json.delay_encode(uri_segs))
-- /apisix/admin/schema/route
local seg_res, seg_id = uri_segs[4], uri_segs[5]
local seg_sub_path = core.table.concat(uri_segs, "/", 6)
if seg_res == "schema" and seg_id == "plugins" then
-- /apisix/admin/schema/plugins/limit-count
seg_res, seg_id = uri_segs[5], uri_segs[6]
seg_sub_path = core.table.concat(uri_segs, "/", 7)
end
local resource = resources[seg_res]
if not resource then
core.response.exit(404)
end
local method = str_lower(get_method())
if not resource[method] then
core.response.exit(404)
end
local req_body, err = core.request.get_body(MAX_REQ_BODY)
if err then
core.log.error("failed to read request body:", err)
core.response.exit(400, {error_msg = "invalid request body:" .. err})
end
if req_body then
local data, err = core.json.decode(req_body)
if not data then
core.log.error("invalid request body:", req_body, "err:", err)
core.response.exit(400, {error_msg = "invalid request body:" .. err,
req_body = req_body})
end
req_body = data
end
local uri_args = ngx.req.get_uri_args() or {}
if uri_args.ttl then
if not tonumber(uri_args.ttl) then
core.response.exit(400, {error_msg = "invalid argument ttl:"
.. "should be a number"})
end
end
local code, data = resource[method](seg_id, req_body, seg_sub_path,
uri_args)
if code then
data = strip_etcd_resp(data)
core.response.exit(code, data)
end
end
依据 resource 定义找到对应的 lua 模板的执行办法
local resources = {routes = require("apisix.admin.routes"),
services = require("apisix.admin.services"),
upstreams = require("apisix.admin.upstreams"),
consumers = require("apisix.admin.consumers"),
schema = require("apisix.admin.schema"),
ssl = require("apisix.admin.ssl"),
plugins = require("apisix.admin.plugins"),
proto = require("apisix.admin.proto"),
global_rules = require("apisix.admin.global_rules"),
stream_routes = require("apisix.admin.stream_routes"),
plugin_metadata = require("apisix.admin.plugin_metadata"),
plugin_configs = require("apisix.admin.plugin_config"),
}
执行对应模块的对应的办法,例如:
GET http://127.0.0.1:9080/apisix/…
就是间接调用 plugin_metadata 的 get 办法
function _M.get(key)
local path = "/plugin_metadata"
if key then
path = path .. "/" .. key
end
local res, err = core.etcd.get(path, not key)
if not res then
core.log.error("failed to get metadata[", key, "]:", err)
return 503, {error_msg = err}
end
return res.status, res.body
end
control api
入口 apisix.http_control()
server {
listen 127.0.0.1:9090;
access_log off;
location / {
content_by_lua_block {apisix.http_control()
}
}
location @50x.html {
set $from_error_page 'true';
content_by_lua_block {require("apisix.error_handling").handle_500()}
}
}
先注册所有的插件的 control_api 办法,调用 router:dispatch 进行路由散发
function _M.match(uri)
if cached_version ~= plugin_mod.load_times then
local err
router, err = fetch_control_api_router()
if router == nil then
core.log.error("failed to fetch valid api router:", err)
return false
end
cached_version = plugin_mod.load_times
end
core.table.clear(match_opts)
match_opts.method = get_method()
return router:dispatch(uri, match_opts)
end
例如:server_info 插件,注册门路 /v1/server_info 并指定应用 get_server_info 函数进行解决
function _M.control_api()
return {
{methods = {"GET"},
uris ={"/v1/server_info"},
handler = get_server_info,
}
}
end
注册 plugin 的 control_api 办法
for _, plugin in ipairs(plugin_mod.plugins) do
local api_fun = plugin.control_api
if api_fun then
local api_route = api_fun()
register_api_routes(routes, api_route)
end
end
通过 dispatch 办法调用插件的 handler 办法
local route, err = match_route(self, path, opts or empty_table, args)
if not route then
if err then
return nil, err
end
return nil
end
local handler = route.handler
if not handler or type(handler) ~= "function" then
return nil, "missing handler"
end
handler(...)
手撸 apisix 地址:
https://github.com/mousycoder…