diff --git a/prometheus_keys.lua b/prometheus_keys.lua index 45d8066..fc7c053 100644 --- a/prometheus_keys.lua +++ b/prometheus_keys.lua @@ -91,9 +91,20 @@ function KeyIndex:list() self:sync() local copy = {} local i = 1 - for _, v in pairs(self.keys) do - copy[i] = v - i = i + 1 + -- Emit a key only from the slot the index currently points at + -- (self.index[key] == idx). self.keys can transiently hold the same key value + -- in two different slots (e.g. when an expired metric is re-added at a new + -- slot before the old slot is reclaimed); listing the raw self.keys values + -- would emit duplicate metrics. Consulting the index guarantees each key is + -- listed exactly once, at its canonical slot. Iterating self.keys (not + -- 0..self.last) keeps this O(live keys): self.last grows monotonically with + -- every add and is never reclaimed, so a slot range scan would walk every + -- dead slot ever created on long-lived, high-churn workers. + for idx, key in pairs(self.keys) do + if self.index[key] == idx then + copy[i] = key + i = i + 1 + end end return copy end @@ -118,14 +129,32 @@ function KeyIndex:add(key_or_keys, err_msg_lru_eviction, exptime) -- key already exists, if has exptime, set expire local expired = false if exptime then - local ok = self.dict:expire(self.key_prefix .. self.index[key], exptime) - -- if key has already expired, remove it from the index + local ok, err = self.dict:expire(self.key_prefix .. self.index[key], exptime) if not ok then - local idx = self.index[key] - self.index[key] = nil - self.keys[idx] = nil - self.expire_keys[idx] = nil - expired = true + if err == "not found" then + -- The slot already expired in the shared dict. Drop the stale + -- local state and bump delete_count so other workers do a full + -- sync and reclaim the slot; without this the old slot lingers in + -- their local self.keys while the metric is re-added at a new slot, + -- desynchronizing the index and causing duplicate metric emission. + -- The dict slot is already gone (expire returned "not found"), so + -- there is no slot to clear here. + local idx = self.index[key] + self.index[key] = nil + self.keys[idx] = nil + self.expire_keys[idx] = nil + self.deleted = self.deleted + 1 + local _, incr_err, forcible = self.dict:incr(self.delete_count, 1, 0) + if incr_err or forcible then + return incr_err or err_msg_lru_eviction + end + expired = true + else + -- Unexpected expire error: the slot may still be live, so leave it + -- as-is rather than re-adding it, which would create a duplicate. + ngx.log(ngx.ERR, "failed to renew expire for key '", key, "': ", + tostring(err)) + end end end if not expired then diff --git a/prometheus_test.lua b/prometheus_test.lua index 0fbc0b0..814b7a8 100644 --- a/prometheus_test.lua +++ b/prometheus_test.lua @@ -58,6 +58,7 @@ function SimpleDict:expire(k, exptime) return nil, "not found" end self.dict[k]["expired"] = os.time() + exptime + return true -- match ngx.shared.DICT:expire, which returns true on success end function SimpleDict:ttl(k) self:get(k) @@ -804,6 +805,78 @@ function TestKeyIndex:testList() luaunit.assertEquals(keys[3], "key3") end +-- Regression test for apache/apisix#11934 (duplicate metrics). +-- A key with an exptime is added and synced (self.last now tracks N). The key +-- then expires in the underlying shared dict without going through remove(), so +-- neither key_count nor delete_count changes and the next sync() is a no-op, +-- leaving self.index still pointing at the now-vanished slot. Re-adding the key +-- therefore takes the "expired" path in add() and allocates a new slot while +-- the stale slot lingers in self.keys. Before the fix, delete_count was not +-- bumped (other workers never re-synced) and list() iterated self.keys, so the +-- same key was emitted twice -> duplicate metrics. +function TestKeyIndex:testExpiredReAddNoDuplicate() + local err = self.key_index:add("expkey", "eviction_err", 1) + luaunit.assertEquals(err, nil) + self.key_index:sync() + luaunit.assertEquals(self.dict:get("_prefix_key_count"), 1) + luaunit.assertEquals(self.dict:get("_prefix_key_1"), "expkey") + luaunit.assertEquals(self.dict:get("_prefix_delete_count"), nil) + luaunit.assertEquals(self.key_index.index["expkey"], 1) + + -- A second worker sharing the same shared dict syncs the initial state, so it + -- now holds slot 1 in its local self.keys/index. This is the worker that the + -- delete_count bump must later force to re-sync and reclaim the stale slot. + local worker2 = require('prometheus_keys').new(self.dict, "_prefix_", 1) + worker2:sync() + luaunit.assertEquals(worker2.index["expkey"], 1) + luaunit.assertEquals(#worker2:list(), 1) + + -- Let the key expire in the underlying shared dict. key_count and + -- delete_count are untouched, so the next sync() inside add() is a no-op and + -- the local index keeps pointing at the (now gone) slot 1. + sleep(2) + luaunit.assertEquals(self.dict:get("_prefix_key_1"), nil) + + -- Re-adding the now-expired key takes the expired branch and allocates slot 2. + err = self.key_index:add("expkey", "eviction_err", 1) + luaunit.assertEquals(err, nil) + luaunit.assertEquals(self.dict:get("_prefix_key_2"), "expkey") + + -- delete_count must have been bumped on the expired re-add path so that + -- other workers do a full sync and drop the stale slot. + luaunit.assertEquals(self.dict:get("_prefix_delete_count"), 1) + + -- list() must report the key exactly once, not twice. + local keys = self.key_index:list() + luaunit.assertEquals(#keys, 1) + luaunit.assertEquals(keys[1], "expkey") + + -- The second worker must converge: its next sync() sees the bumped + -- delete_count, does a full sync, drops the stale slot 1 and picks up slot 2. + -- Without the delete_count bump it would keep slot 1 forever and list() the + -- key twice. + worker2:sync() + luaunit.assertEquals(worker2.keys[1], nil) + luaunit.assertEquals(worker2.index["expkey"], 2) + local keys2 = worker2:list() + luaunit.assertEquals(#keys2, 1) + luaunit.assertEquals(keys2[1], "expkey") +end + +-- list() must de-duplicate when self.keys transiently holds the same key in two +-- slots (the window before a stale slot is reclaimed): only the slot the index +-- points at is canonical. This isolates the list() guard from the add() path. +function TestKeyIndex:testListDeduplicatesStaleSlot() + self.key_index.keys = {[1] = "dup", [2] = "dup"} + self.key_index.index = {dup = 2} + self.key_index.last = 2 + + local keys = self.key_index:list() + luaunit.assertEquals(ngx.logs, nil) + luaunit.assertEquals(#keys, 1) + luaunit.assertEquals(keys[1], "dup") +end + function TestKeyIndex:testSync() self.key_index:sync() luaunit.assertEquals(ngx.logs, nil)