|
| 1 | +-- |
| 2 | +-- Licensed to the Apache Software Foundation (ASF) under one or more |
| 3 | +-- contributor license agreements. See the NOTICE file distributed with |
| 4 | +-- this work for additional information regarding copyright ownership. |
| 5 | +-- The ASF licenses this file to You under the Apache License, Version 2.0 |
| 6 | +-- (the "License"); you may not use this file except in compliance with |
| 7 | +-- the License. You may obtain a copy of the License at |
| 8 | +-- |
| 9 | +-- http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | +-- |
| 11 | +-- Unless required by applicable law or agreed to in writing, software |
| 12 | +-- distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | +-- See the License for the specific language governing permissions and |
| 15 | +-- limitations under the License. |
| 16 | +-- |
| 17 | + |
| 18 | +local core = require("apisix.core") |
| 19 | +local utils = require("apisix.discovery.zookeeper.utils") |
| 20 | +local schema = require("apisix.discovery.zookeeper.schema") |
| 21 | +local table = require("apisix.core.table") |
| 22 | +local ngx = ngx |
| 23 | +local ipairs = ipairs |
| 24 | +local log = core.log |
| 25 | + |
| 26 | +local _M = { |
| 27 | + version = 0.1, |
| 28 | + priority = 1000, |
| 29 | + name = "zookeeper", |
| 30 | + schema = schema.schema, |
| 31 | +} |
| 32 | + |
| 33 | +-- Global Configuration |
| 34 | +local local_conf |
| 35 | +-- Service Instance Cache(service_name -> {nodes, expire_time}) |
| 36 | +local instance_cache = core.lrucache.new({ |
| 37 | + ttl = 3600, |
| 38 | + count = 1024 |
| 39 | +}) |
| 40 | + |
| 41 | +-- Timer Identifier |
| 42 | +local fetch_timer |
| 43 | + |
| 44 | +-- The instance list of a single service from ZooKeeper |
| 45 | +local function fetch_service_instances(conf, service_name) |
| 46 | + -- 1. Init connect |
| 47 | + local client, err = utils.new_zk_client(conf) |
| 48 | + if not client then |
| 49 | + return nil, err |
| 50 | + end |
| 51 | + |
| 52 | + -- 2. TODO: Create path |
| 53 | + local service_path = conf.root_path .. "/" .. service_name |
| 54 | + local ok, err = utils.create_zk_path(client, service_path) |
| 55 | + if not ok then |
| 56 | + utils.close_zk_client(client) |
| 57 | + return nil, err |
| 58 | + end |
| 59 | + |
| 60 | + -- 3. All instance nodes under a service |
| 61 | + local children, err = client:get_children(service_path) |
| 62 | + if not children then |
| 63 | + utils.close_zk_client(client) |
| 64 | + if err == "not exists" then |
| 65 | + log.warn("service path not exists: ", service_path) |
| 66 | + return {} |
| 67 | + end |
| 68 | + log.error("get zk children failed: ", err) |
| 69 | + return nil, err |
| 70 | + end |
| 71 | + |
| 72 | + -- 4. Parse the data of each instance node one by one |
| 73 | + local instances = {} |
| 74 | + for _, child in ipairs(children) do |
| 75 | + local instance_path = service_path .. "/" .. child |
| 76 | + local data, stat, err = client:get(instance_path) |
| 77 | + if not data then |
| 78 | + log.error("get instance data failed: ", instance_path, " stat:", stat, " err: ", err) |
| 79 | + goto continue |
| 80 | + end |
| 81 | + |
| 82 | + -- Parse instance data |
| 83 | + local instance = utils.parse_instance_data(data) |
| 84 | + if instance then |
| 85 | + table.insert(instances, instance) |
| 86 | + end |
| 87 | + |
| 88 | + ::continue:: |
| 89 | + end |
| 90 | + |
| 91 | + -- 5. Close connects |
| 92 | + utils.close_zk_client(client) |
| 93 | + |
| 94 | + log.debug("fetch service instances: ", service_name, " count: ", #instances) |
| 95 | + return instances |
| 96 | +end |
| 97 | + |
| 98 | +-- Scheduled fetch of all service instances (full cache update)) |
| 99 | +local function fetch_all_services() |
| 100 | + if not local_conf then |
| 101 | + log.warn("zookeeper discovery config not loaded") |
| 102 | + return |
| 103 | + end |
| 104 | + |
| 105 | + -- 1. Init Zookeeper client |
| 106 | + local client, err = utils.new_zk_client(local_conf) |
| 107 | + if not client then |
| 108 | + log.error("init zk client failed: ", err) |
| 109 | + return |
| 110 | + end |
| 111 | + |
| 112 | + -- 2. All instance nodes under a service |
| 113 | + local services, err = client:get_children(local_conf.root_path) |
| 114 | + if not services then |
| 115 | + utils.close_zk_client(client) |
| 116 | + log.error("get zk root children failed: ", err) |
| 117 | + return |
| 118 | + end |
| 119 | + |
| 120 | + -- 3. Fetch the instances of each service and update the cache |
| 121 | + local now = ngx.time() |
| 122 | + for _, service in ipairs(services) do |
| 123 | + local instances, err = fetch_service_instances(local_conf, service) |
| 124 | + if instances then |
| 125 | + instance_cache:set(service, nil, { |
| 126 | + nodes = instances, |
| 127 | + expire_time = now + local_conf.cache_ttl |
| 128 | + }) |
| 129 | + else |
| 130 | + log.error("fetch service instances failed: ", service, " err: ", err) |
| 131 | + end |
| 132 | + end |
| 133 | + |
| 134 | + -- 4. Close connects |
| 135 | + utils.close_zk_client(client) |
| 136 | +end |
| 137 | + |
| 138 | +function _M.nodes(service_name) |
| 139 | + if not service_name or service_name == "" then |
| 140 | + log.error("service name is empty") |
| 141 | + return nil |
| 142 | + end |
| 143 | + |
| 144 | + -- 1. Get from cache |
| 145 | + local cache = instance_cache:get(service_name) |
| 146 | + local now = ngx.time() |
| 147 | + |
| 148 | + -- 2. If the cache is missed or expired, actively pull (the data)) |
| 149 | + if not cache or cache.expire_time < now then |
| 150 | + log.debug("cache miss or expired, fetch from zk: ", service_name) |
| 151 | + local instances, err = fetch_service_instances(local_conf, service_name) |
| 152 | + if not instances then |
| 153 | + log.error("fetch instances failed: ", service_name, " err: ", err) |
| 154 | + -- Fallback: Return the old cache (if available)) |
| 155 | + if cache then |
| 156 | + return cache.nodes |
| 157 | + end |
| 158 | + return nil |
| 159 | + end |
| 160 | + |
| 161 | + -- Update the cache |
| 162 | + cache = { |
| 163 | + nodes = instances, |
| 164 | + expire_time = now + local_conf.cache_ttl |
| 165 | + } |
| 166 | + instance_cache:set(service_name, nil, cache) |
| 167 | + end |
| 168 | + |
| 169 | + return cache.nodes |
| 170 | +end |
| 171 | + |
| 172 | +function _M.check_schema(conf) |
| 173 | + return schema.check(conf) |
| 174 | +end |
| 175 | + |
| 176 | +function _M.init_worker() |
| 177 | + -- Load configuration |
| 178 | + local core_config = core.config.local_conf |
| 179 | + local_conf = core_config.discovery and core_config.discovery.zookeeper or {} |
| 180 | + local ok, err = schema.check(local_conf) |
| 181 | + if not ok then |
| 182 | + log.error("invalid zookeeper discovery config: ", err) |
| 183 | + return |
| 184 | + end |
| 185 | + |
| 186 | + -- The default values |
| 187 | + local_conf.connect_string = local_conf.connect_string or "127.0.0.1:2181" |
| 188 | + local_conf.fetch_interval = local_conf.fetch_interval or 10 |
| 189 | + local_conf.cache_ttl = local_conf.cache_ttl or 30 |
| 190 | + |
| 191 | + -- Start the timer |
| 192 | + if not fetch_timer then |
| 193 | + fetch_timer = ngx.timer.every(local_conf.fetch_interval, fetch_all_services) |
| 194 | + log.info("zk discovery fetch timer started, interval: ", local_conf.fetch_interval, "s") |
| 195 | + end |
| 196 | + |
| 197 | + -- Manually execute a full pull immediately |
| 198 | + ngx.timer.at(0, fetch_all_services) |
| 199 | +end |
| 200 | + |
| 201 | +function _M.destroy() |
| 202 | + if fetch_timer then |
| 203 | + fetch_timer = nil |
| 204 | + end |
| 205 | + instance_cache:flush_all() |
| 206 | + log.info("zookeeper discovery destroyed") |
| 207 | +end |
| 208 | + |
| 209 | +return _M |
0 commit comments