Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -281,14 +281,15 @@ install: runtime

$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/discovery
$(ENV_INSTALL) apisix/discovery/*.lua $(ENV_INST_LUADIR)/apisix/discovery/
$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/discovery/{consul,consul_kv,dns,eureka,nacos,kubernetes,tars}
$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/discovery/{consul,consul_kv,dns,eureka,nacos,kubernetes,tars,zookeeper}
$(ENV_INSTALL) apisix/discovery/consul/*.lua $(ENV_INST_LUADIR)/apisix/discovery/consul
$(ENV_INSTALL) apisix/discovery/consul_kv/*.lua $(ENV_INST_LUADIR)/apisix/discovery/consul_kv
$(ENV_INSTALL) apisix/discovery/dns/*.lua $(ENV_INST_LUADIR)/apisix/discovery/dns
$(ENV_INSTALL) apisix/discovery/eureka/*.lua $(ENV_INST_LUADIR)/apisix/discovery/eureka
$(ENV_INSTALL) apisix/discovery/kubernetes/*.lua $(ENV_INST_LUADIR)/apisix/discovery/kubernetes
$(ENV_INSTALL) apisix/discovery/nacos/*.lua $(ENV_INST_LUADIR)/apisix/discovery/nacos
$(ENV_INSTALL) apisix/discovery/tars/*.lua $(ENV_INST_LUADIR)/apisix/discovery/tars
$(ENV_INSTALL) apisix/discovery/zookeeper/*.lua $(ENV_INST_LUADIR)/apisix/discovery/zookeeper

$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/http
$(ENV_INSTALL) apisix/http/*.lua $(ENV_INST_LUADIR)/apisix/http/
Expand Down
209 changes: 209 additions & 0 deletions apisix/discovery/zookeeper/init.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

local core = require("apisix.core")
local utils = require("apisix.discovery.zookeeper.utils")
local schema = require("apisix.discovery.zookeeper.schema")
local table = require("apisix.core.table")
local ngx = ngx
local ipairs = ipairs
local log = core.log

local _M = {
version = 0.1,
priority = 1000,
name = "zookeeper",
schema = schema.schema,
}

-- Global Configuration
local local_conf
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is recommended that the main process request the service instance from ZooKeeper and synchronize it to the worker processes via the shared dict.

-- Service Instance Cache(service_name -> {nodes, expire_time})
local instance_cache = core.lrucache.new({
ttl = 3600,
count = 1024
})

-- Timer Identifier
local fetch_timer

-- The instance list of a single service from ZooKeeper
local function fetch_service_instances(conf, service_name)
-- 1. Init connect
local client, err = utils.new_zk_client(conf)
if not client then
return nil, err
end

-- 2. TODO: Create path
local service_path = conf.root_path .. "/" .. service_name
local ok, err = utils.create_zk_path(client, service_path)
if not ok then
utils.close_zk_client(client)
return nil, err
end

-- 3. All instance nodes under a service
local children, err = client:get_children(service_path)
if not children then
utils.close_zk_client(client)
if err == "not exists" then
log.warn("service path not exists: ", service_path)
return {}
end
log.error("get zk children failed: ", err)
return nil, err
end

-- 4. Parse the data of each instance node one by one
local instances = {}
for _, child in ipairs(children) do
local instance_path = service_path .. "/" .. child
local data, stat, err = client:get(instance_path)
if not data then
log.error("get instance data failed: ", instance_path, " stat:", stat, " err: ", err)
goto continue
end

-- Parse instance data
local instance = utils.parse_instance_data(data)
if instance then
table.insert(instances, instance)
end

::continue::
end

-- 5. Close connects
utils.close_zk_client(client)

log.debug("fetch service instances: ", service_name, " count: ", #instances)
return instances
end

-- Scheduled fetch of all service instances (full cache update))
local function fetch_all_services()
if not local_conf then
log.warn("zookeeper discovery config not loaded")
return
end

-- 1. Init Zookeeper client
local client, err = utils.new_zk_client(local_conf)
if not client then
log.error("init zk client failed: ", err)
return
end

-- 2. All instance nodes under a service
local services, err = client:get_children(local_conf.root_path)
if not services then
utils.close_zk_client(client)
log.error("get zk root children failed: ", err)
return
end

-- 3. Fetch the instances of each service and update the cache
local now = ngx.time()
for _, service in ipairs(services) do
local instances, err = fetch_service_instances(local_conf, service)
if instances then
instance_cache:set(service, nil, {
nodes = instances,
expire_time = now + local_conf.cache_ttl
})
else
log.error("fetch service instances failed: ", service, " err: ", err)
end
end

-- 4. Close connects
utils.close_zk_client(client)
end

function _M.nodes(service_name)
if not service_name or service_name == "" then
log.error("service name is empty")
return nil
end

-- 1. Get from cache
local cache = instance_cache:get(service_name)
local now = ngx.time()

-- 2. If the cache is missed or expired, actively pull (the data))
if not cache or cache.expire_time < now then
log.debug("cache miss or expired, fetch from zk: ", service_name)
local instances, err = fetch_service_instances(local_conf, service_name)
if not instances then
log.error("fetch instances failed: ", service_name, " err: ", err)
-- Fallback: Return the old cache (if available))
if cache then
return cache.nodes
end
return nil
end

-- Update the cache
cache = {
nodes = instances,
expire_time = now + local_conf.cache_ttl
}
instance_cache:set(service_name, nil, cache)
end

return cache.nodes
end

function _M.check_schema(conf)
return schema.check(conf)
end

function _M.init_worker()
-- Load configuration
local core_config = core.config.local_conf
local_conf = core_config.discovery and core_config.discovery.zookeeper or {}
local ok, err = schema.check(local_conf)
if not ok then
log.error("invalid zookeeper discovery config: ", err)
return
end

-- The default values
local_conf.connect_string = local_conf.connect_string or "127.0.0.1:2181"
local_conf.fetch_interval = local_conf.fetch_interval or 10
local_conf.cache_ttl = local_conf.cache_ttl or 30

-- Start the timer
if not fetch_timer then
fetch_timer = ngx.timer.every(local_conf.fetch_interval, fetch_all_services)
log.info("zk discovery fetch timer started, interval: ", local_conf.fetch_interval, "s")
end

-- Manually execute a full pull immediately
ngx.timer.at(0, fetch_all_services)
end

function _M.destroy()
if fetch_timer then
fetch_timer = nil
end
instance_cache:flush_all()
log.info("zookeeper discovery destroyed")
end

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A dump method is also needed to retrieve all services in the current service discovery instance. You can explore other service discovery modules for more information.

return _M
91 changes: 91 additions & 0 deletions apisix/discovery/zookeeper/schema.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

local core = require("apisix.core")

local schema = {
type = "object",
properties = {
-- ZooKeeper Cluster Addresses (separated by commas for multiple addresses)
connect_string = {
type = "string",
default = "127.0.0.1:2181"
},
-- ZooKeeper Session Timeout (milliseconds)
session_timeout = {
type = "integer",
minimum = 1000,
default = 30000
},
-- ZooKeeper Connect Timeout (milliseconds)
connect_timeout = {
type = "integer",
minimum = 1000,
default = 5000
},
-- Service Discovery Root Path
root_path = {
type = "string",
default = "/apisix/discovery/zk"
},
-- Instance Fetch Interval (seconds)
fetch_interval = {
type = "integer",
minimum = 1,
default = 10
},
-- The default weight value for service instances that do not specify a weight in ZooKeeper.
-- It is used for load balancing (higher weight means more traffic).
-- Default value is 100, and the value range is 1-500.
weight = {
type = "integer",
minimum = 1,
default = 100
},
-- ZooKeeper Authentication Information (digest: username:password):
-- Digest authentication credentials for accessing ZooKeeper cluster.
-- Format requirement: "digest:{username}:{password}".
-- Leave empty to disable authentication (not recommended for production).
auth = {
type = "object",
properties = {
type = {type = "string", enum = {"digest"}, default = "digest"},
creds = {type = "string"} -- digest: username:password
}
},
-- Cache Expiration Time (seconds):
-- The time after which service instance cache becomes expired.
-- Default value is 60 seconds
cache_ttl = {
type = "integer",
minimum = 1,
default = 60
}
},
required = {},
additionalProperties = false
}

local _M = {
schema = schema
}

function _M.check(conf)
return core.schema.check(schema, conf)
end

return _M
Loading
Loading