Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 39 additions & 10 deletions prometheus_keys.lua
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,20 @@ function KeyIndex:list()
self:sync()
local copy = {}
local i = 1
for _, v in pairs(self.keys) do
copy[i] = v
i = i + 1
-- Emit a key only from the slot the index currently points at
-- (self.index[key] == idx). self.keys can transiently hold the same key value
-- in two different slots (e.g. when an expired metric is re-added at a new
-- slot before the old slot is reclaimed); listing the raw self.keys values
-- would emit duplicate metrics. Consulting the index guarantees each key is
-- listed exactly once, at its canonical slot. Iterating self.keys (not
-- 0..self.last) keeps this O(live keys): self.last grows monotonically with
-- every add and is never reclaimed, so a slot range scan would walk every
-- dead slot ever created on long-lived, high-churn workers.
for idx, key in pairs(self.keys) do
if self.index[key] == idx then
copy[i] = key
i = i + 1
end
end
return copy
end
Expand All @@ -118,14 +129,32 @@ function KeyIndex:add(key_or_keys, err_msg_lru_eviction, exptime)
-- key already exists, if has exptime, set expire
local expired = false
if exptime then
local ok = self.dict:expire(self.key_prefix .. self.index[key], exptime)
-- if key has already expired, remove it from the index
local ok, err = self.dict:expire(self.key_prefix .. self.index[key], exptime)
if not ok then
local idx = self.index[key]
self.index[key] = nil
self.keys[idx] = nil
self.expire_keys[idx] = nil
expired = true
if err == "not found" then
-- The slot already expired in the shared dict. Drop the stale
-- local state and bump delete_count so other workers do a full
-- sync and reclaim the slot; without this the old slot lingers in
-- their local self.keys while the metric is re-added at a new slot,
-- desynchronizing the index and causing duplicate metric emission.
-- The dict slot is already gone (expire returned "not found"), so
-- there is no slot to clear here.
local idx = self.index[key]
self.index[key] = nil
self.keys[idx] = nil
self.expire_keys[idx] = nil
self.deleted = self.deleted + 1
local _, incr_err, forcible = self.dict:incr(self.delete_count, 1, 0)
if incr_err or forcible then
return incr_err or err_msg_lru_eviction
end
expired = true
else
-- Unexpected expire error: the slot may still be live, so leave it
-- as-is rather than re-adding it, which would create a duplicate.
ngx.log(ngx.ERR, "failed to renew expire for key '", key, "': ",
tostring(err))
end
end
end
if not expired then
Expand Down
73 changes: 73 additions & 0 deletions prometheus_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ function SimpleDict:expire(k, exptime)
return nil, "not found"
end
self.dict[k]["expired"] = os.time() + exptime
return true -- match ngx.shared.DICT:expire, which returns true on success
end
function SimpleDict:ttl(k)
self:get(k)
Expand Down Expand Up @@ -804,6 +805,78 @@ function TestKeyIndex:testList()
luaunit.assertEquals(keys[3], "key3")
end

-- Regression test for apache/apisix#11934 (duplicate metrics).
-- A key with an exptime is added and synced (self.last now tracks N). The key
-- then expires in the underlying shared dict without going through remove(), so
-- neither key_count nor delete_count changes and the next sync() is a no-op,
-- leaving self.index still pointing at the now-vanished slot. Re-adding the key
-- therefore takes the "expired" path in add() and allocates a new slot while
-- the stale slot lingers in self.keys. Before the fix, delete_count was not
-- bumped (other workers never re-synced) and list() iterated self.keys, so the
-- same key was emitted twice -> duplicate metrics.
function TestKeyIndex:testExpiredReAddNoDuplicate()
local err = self.key_index:add("expkey", "eviction_err", 1)
luaunit.assertEquals(err, nil)
self.key_index:sync()
luaunit.assertEquals(self.dict:get("_prefix_key_count"), 1)
luaunit.assertEquals(self.dict:get("_prefix_key_1"), "expkey")
luaunit.assertEquals(self.dict:get("_prefix_delete_count"), nil)
luaunit.assertEquals(self.key_index.index["expkey"], 1)

-- A second worker sharing the same shared dict syncs the initial state, so it
-- now holds slot 1 in its local self.keys/index. This is the worker that the
-- delete_count bump must later force to re-sync and reclaim the stale slot.
local worker2 = require('prometheus_keys').new(self.dict, "_prefix_", 1)
worker2:sync()
luaunit.assertEquals(worker2.index["expkey"], 1)
luaunit.assertEquals(#worker2:list(), 1)

-- Let the key expire in the underlying shared dict. key_count and
-- delete_count are untouched, so the next sync() inside add() is a no-op and
-- the local index keeps pointing at the (now gone) slot 1.
sleep(2)
luaunit.assertEquals(self.dict:get("_prefix_key_1"), nil)

-- Re-adding the now-expired key takes the expired branch and allocates slot 2.
err = self.key_index:add("expkey", "eviction_err", 1)
luaunit.assertEquals(err, nil)
luaunit.assertEquals(self.dict:get("_prefix_key_2"), "expkey")

-- delete_count must have been bumped on the expired re-add path so that
-- other workers do a full sync and drop the stale slot.
luaunit.assertEquals(self.dict:get("_prefix_delete_count"), 1)

-- list() must report the key exactly once, not twice.
local keys = self.key_index:list()
luaunit.assertEquals(#keys, 1)
luaunit.assertEquals(keys[1], "expkey")

-- The second worker must converge: its next sync() sees the bumped
-- delete_count, does a full sync, drops the stale slot 1 and picks up slot 2.
-- Without the delete_count bump it would keep slot 1 forever and list() the
-- key twice.
worker2:sync()
luaunit.assertEquals(worker2.keys[1], nil)
luaunit.assertEquals(worker2.index["expkey"], 2)
local keys2 = worker2:list()
luaunit.assertEquals(#keys2, 1)
luaunit.assertEquals(keys2[1], "expkey")
end

-- list() must de-duplicate when self.keys transiently holds the same key in two
-- slots (the window before a stale slot is reclaimed): only the slot the index
-- points at is canonical. This isolates the list() guard from the add() path.
function TestKeyIndex:testListDeduplicatesStaleSlot()
self.key_index.keys = {[1] = "dup", [2] = "dup"}
self.key_index.index = {dup = 2}
self.key_index.last = 2

local keys = self.key_index:list()
luaunit.assertEquals(ngx.logs, nil)
luaunit.assertEquals(#keys, 1)
luaunit.assertEquals(keys[1], "dup")
end

function TestKeyIndex:testSync()
self.key_index:sync()
luaunit.assertEquals(ngx.logs, nil)
Expand Down
Loading