Initial commit of working RSS Aggregator build

This commit is contained in:
2026-05-12 17:04:02 -03:00
parent ea3a2ca53e
commit 7ac2f6e384
4962 changed files with 1032666 additions and 0 deletions
@@ -0,0 +1,27 @@
--[[
Add delay marker if needed.
]]
-- Includes
--- @include "getNextDelayedTimestamp"
local function addDelayMarkerIfNeeded(targetKey, delayedKey)
local waitLen = rcall("LLEN", targetKey)
if waitLen <= 1 then
local nextTimestamp = getNextDelayedTimestamp(delayedKey)
if nextTimestamp ~= nil then
-- Check if there is already a marker with older timestamp
-- if there is, we need to replace it.
if waitLen == 1 then
local marker = rcall("LINDEX", targetKey, 0)
local oldTimestamp = tonumber(marker:sub(3))
if oldTimestamp and oldTimestamp > nextTimestamp then
rcall("LSET", targetKey, 0, "0:" .. nextTimestamp)
end
else
-- if there is no marker, then we need to add one
rcall("LPUSH", targetKey, "0:" .. nextTimestamp)
end
end
end
end
+15
View File
@@ -0,0 +1,15 @@
--[[
Function to add job considering priority.
]]
-- Includes
--- @include "addPriorityMarkerIfNeeded"
--- @include "getPriorityScore"
local function addJobWithPriority(waitKey, prioritizedKey, priority, paused, jobId, priorityCounterKey)
local score = getPriorityScore(priority, priorityCounterKey)
rcall("ZADD", prioritizedKey, score, jobId)
if not paused then
addPriorityMarkerIfNeeded(waitKey)
end
end
@@ -0,0 +1,12 @@
--[[
Function priority marker to wait if needed
in order to wake up our workers and to respect priority
order as much as possible
]]
local function addPriorityMarkerIfNeeded(waitKey)
local waitLen = rcall("LLEN", waitKey)
if waitLen == 0 then
rcall("LPUSH", waitKey, "0:0")
end
end
+18
View File
@@ -0,0 +1,18 @@
--[[
Function to loop in batches.
Just a bit of warning, some commands as ZREM
could receive a maximum of 7000 parameters per call.
]]
local function batches(n, batchSize)
local i = 0
return function()
local from = i * batchSize + 1
i = i + 1
if (from <= n) then
local to = math.min(from + batchSize - 1, n)
return from, to
end
end
end
+12
View File
@@ -0,0 +1,12 @@
--[[
Functions to check if a item belongs to a list.
]]
local function checkItemInList(list, item)
for _, v in pairs(list) do
if v == item then
return 1
end
end
return nil
end
+139
View File
@@ -0,0 +1,139 @@
--[[
Move stalled jobs to wait.
Input:
stalledKey 'stalled' (SET)
waitKey 'wait', (LIST)
activeKey 'active', (LIST)
failedKey 'failed', (ZSET)
stalledCheckKey 'stalled-check', (KEY)
metaKey 'meta', (KEY)
pausedKey 'paused', (LIST)
eventStreamKey 'event stream' (STREAM)
maxStalledJobCount Max stalled job count
queueKeyPrefix queue.toKey('')
timestamp timestamp
maxCheckTime max check time
Events:
'stalled' with stalled job id.
]]
local rcall = redis.call
-- Includes
--- @include "batches"
--- @include "getTargetQueueList"
--- @include "removeJob"
--- @include "removeJobsByMaxAge"
--- @include "removeJobsByMaxCount"
--- @include "trimEvents"
-- Check if we need to check for stalled jobs now.
local function checkStalledJobs(stalledKey, waitKey, activeKey, failedKey,
stalledCheckKey, metaKey, pausedKey,
eventStreamKey, maxStalledJobCount,
queueKeyPrefix, timestamp, maxCheckTime)
if rcall("EXISTS", stalledCheckKey) == 1 then return {{}, {}} end
rcall("SET", stalledCheckKey, timestamp, "PX", maxCheckTime)
-- Trim events before emiting them to avoid trimming events emitted in this script
trimEvents(metaKey, eventStreamKey)
-- Move all stalled jobs to wait
local stalling = rcall('SMEMBERS', stalledKey)
local stalled = {}
local failed = {}
if (#stalling > 0) then
rcall('DEL', stalledKey)
local MAX_STALLED_JOB_COUNT = tonumber(maxStalledJobCount)
-- Remove from active list
for i, jobId in ipairs(stalling) do
if string.sub(jobId, 1, 2) == "0:" then
-- If the jobId is a delay marker ID we just remove it.
rcall("LREM", activeKey, 1, jobId)
else
local jobKey = queueKeyPrefix .. jobId
-- Check that the lock is also missing, then we can handle this job as really stalled.
if (rcall("EXISTS", jobKey .. ":lock") == 0) then
-- Remove from the active queue.
local removed = rcall("LREM", activeKey, 1, jobId)
if (removed > 0) then
-- If this job has been stalled too many times, such as if it crashes the worker, then fail it.
local stalledCount =
rcall("HINCRBY", jobKey, "stalledCounter", 1)
if (stalledCount > MAX_STALLED_JOB_COUNT) then
local rawOpts = rcall("HGET", jobKey, "opts")
local opts = cjson.decode(rawOpts)
local removeOnFailType = type(opts["removeOnFail"])
rcall("ZADD", failedKey, timestamp, jobId)
local failedReason =
"job stalled more than allowable limit"
rcall("HMSET", jobKey, "failedReason", failedReason,
"finishedOn", timestamp)
rcall("XADD", eventStreamKey, "*", "event",
"failed", "jobId", jobId, 'prev', 'active',
'failedReason', failedReason)
if removeOnFailType == "number" then
removeJobsByMaxCount(opts["removeOnFail"],
failedKey, queueKeyPrefix)
elseif removeOnFailType == "boolean" then
if opts["removeOnFail"] then
removeJob(jobId, false, queueKeyPrefix)
rcall("ZREM", failedKey, jobId)
end
elseif removeOnFailType ~= "nil" then
local maxAge = opts["removeOnFail"]["age"]
local maxCount = opts["removeOnFail"]["count"]
if maxAge ~= nil then
removeJobsByMaxAge(timestamp, maxAge,
failedKey, queueKeyPrefix)
end
if maxCount ~= nil and maxCount > 0 then
removeJobsByMaxCount(maxCount, failedKey,
queueKeyPrefix)
end
end
table.insert(failed, jobId)
else
local target =
getTargetQueueList(metaKey, waitKey, pausedKey)
-- Move the job back to the wait queue, to immediately be picked up by a waiting worker.
rcall("RPUSH", target, jobId)
rcall("XADD", eventStreamKey, "*", "event",
"waiting", "jobId", jobId, 'prev', 'active')
-- Emit the stalled event
rcall("XADD", eventStreamKey, "*", "event",
"stalled", "jobId", jobId)
table.insert(stalled, jobId)
end
end
end
end
end
end
-- Mark potentially stalled jobs
local active = rcall('LRANGE', activeKey, 0, -1)
if (#active > 0) then
for from, to in batches(#active, 7000) do
rcall('SADD', stalledKey, unpack(active, from, to))
end
end
return {failed, stalled}
end
+47
View File
@@ -0,0 +1,47 @@
--[[
Function to clean job list.
Returns jobIds and deleted count number.
]]
-- Includes
--- @include "getTimestamp"
--- @include "removeJob"
local function cleanList(listKey, jobKeyPrefix, rangeStart, rangeEnd,
timestamp, isWaiting)
local jobs = rcall("LRANGE", listKey, rangeStart, rangeEnd)
local deleted = {}
local deletedCount = 0
local jobTS
local deletionMarker = ''
local jobIdsLen = #jobs
for i, job in ipairs(jobs) do
if limit > 0 and deletedCount >= limit then
break
end
local jobKey = jobKeyPrefix .. job
if (isWaiting or rcall("EXISTS", jobKey .. ":lock") == 0) then
-- Find the right timestamp of the job to compare to maxTimestamp:
-- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed
-- * processedOn represents when the job was last attempted, but it doesn't get populated until
-- the job is first tried
-- * timestamp is the original job submission time
-- Fetch all three of these (in that order) and use the first one that is set so that we'll leave jobs
-- that have been active within the grace period:
jobTS = getTimestamp(jobKey, {"finishedOn", "processedOn", "timestamp"})
if (not jobTS or jobTS <= timestamp) then
-- replace the entry with a deletion marker; the actual deletion will
-- occur at the end of the script
rcall("LSET", listKey, rangeEnd - jobIdsLen + i, deletionMarker)
removeJob(job, true, jobKeyPrefix)
deletedCount = deletedCount + 1
table.insert(deleted, job)
end
end
end
rcall("LREM", listKey, 0, deletionMarker)
return {deleted, deletedCount}
end
+45
View File
@@ -0,0 +1,45 @@
--[[
Function to clean job set.
Returns jobIds and deleted count number.
]]
-- Includes
--- @include "batches"
--- @include "getJobsInZset"
--- @include "getTimestamp"
--- @include "removeJob"
local function cleanSet(setKey, jobKeyPrefix, rangeEnd, timestamp, limit, attributes, isFinished)
local jobs = getJobsInZset(setKey, rangeEnd, limit)
local deleted = {}
local deletedCount = 0
local jobTS
for i, job in ipairs(jobs) do
if limit > 0 and deletedCount >= limit then
break
end
local jobKey = jobKeyPrefix .. job
if isFinished then
removeJob(job, true, jobKeyPrefix)
deletedCount = deletedCount + 1
table.insert(deleted, job)
else
-- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed
jobTS = getTimestamp(jobKey, attributes)
if (not jobTS or jobTS <= timestamp) then
removeJob(job, true, jobKeyPrefix)
deletedCount = deletedCount + 1
table.insert(deleted, job)
end
end
end
if(#deleted > 0) then
for from, to in batches(#deleted, 7000) do
rcall("ZREM", setKey, unpack(deleted, from, to))
end
end
return {deleted, deletedCount}
end
+46
View File
@@ -0,0 +1,46 @@
--[[
Functions to collect metrics based on a current and previous count of jobs.
Granualarity is fixed at 1 minute.
]]
--- @include "batches"
local function collectMetrics(metaKey, dataPointsList, maxDataPoints,
timestamp)
-- Increment current count
local count = rcall("HINCRBY", metaKey, "count", 1) - 1
-- Compute how many data points we need to add to the list, N.
local prevTS = rcall("HGET", metaKey, "prevTS")
if not prevTS then
-- If prevTS is nil, set it to the current timestamp
rcall("HSET", metaKey, "prevTS", timestamp, "prevCount", 0)
return
end
local N = math.floor((timestamp - prevTS) / 60000)
if N > 0 then
local delta = count - rcall("HGET", metaKey, "prevCount")
-- If N > 1, add N-1 zeros to the list
if N > 1 then
local points = {}
points[1] = delta
for i = 2, N do
points[i] = 0
end
for from, to in batches(#points, 7000) do
rcall("LPUSH", dataPointsList, unpack(points, from, to))
end
else
-- LPUSH delta to the list
rcall("LPUSH", dataPointsList, delta)
end
-- LTRIM to keep list to its max size
rcall("LTRIM", dataPointsList, 0, maxDataPoints - 1)
-- update prev count with current count
rcall("HSET", metaKey, "prevCount", count, "prevTS", timestamp)
end
end
+12
View File
@@ -0,0 +1,12 @@
--[[
Functions to destructure job key.
Just a bit of warning, these functions may be a bit slow and affect performance significantly.
]]
local getJobIdFromKey = function (jobKey)
return string.match(jobKey, ".*:(.*)")
end
local getJobKeyPrefix = function (jobKey, jobId)
return string.sub(jobKey, 0, #jobKey - #jobId)
end
+70
View File
@@ -0,0 +1,70 @@
--[[
Function to achieve pagination for a set or hash.
This function simulates pagination in the most efficient way possible
for a set using sscan or hscan.
The main limitation is that sets are not order preserving, so the
pagination is not stable. This means that if the set is modified
between pages, the same element may appear in different pages.
]] -- Maximum number of elements to be returned by sscan per iteration.
local maxCount = 100
-- Finds the cursor, and returns the first elements available for the requested page.
local function findPage(key, command, pageStart, pageSize, cursor, offset,
maxIterations, fetchJobs)
local items = {}
local jobs = {}
local iterations = 0
repeat
-- Iterate over the set using sscan/hscan.
local result = rcall(command, key, cursor, "COUNT", maxCount)
cursor = result[1]
local members = result[2]
local step = 1
if command == "HSCAN" then
step = 2
end
if #members == 0 then
-- If the result is empty, we can return the result.
return cursor, offset, items, jobs
end
local chunkStart = offset
local chunkEnd = offset + #members / step
local pageEnd = pageStart + pageSize
if chunkEnd < pageStart then
-- If the chunk is before the page, we can skip it.
offset = chunkEnd
elseif chunkStart > pageEnd then
-- If the chunk is after the page, we can return the result.
return cursor, offset, items, jobs
else
-- If the chunk is overlapping the page, we need to add the elements to the result.
for i = 1, #members, step do
if offset >= pageEnd then
return cursor, offset, items, jobs
end
if offset >= pageStart then
local index = #items + 1
if fetchJobs ~= nil then
jobs[#jobs+1] = rcall("HGETALL", members[i])
end
if step == 2 then
items[index] = {members[i], members[i + 1]}
else
items[index] = members[i]
end
end
offset = offset + 1
end
end
iterations = iterations + 1
until cursor == "0" or iterations >= maxIterations
return cursor, offset, items, jobs
end
+11
View File
@@ -0,0 +1,11 @@
-- We use ZRANGEBYSCORE to make the case where we're deleting a limited number
-- of items in a sorted set only run a single iteration. If we simply used
-- ZRANGE, we may take a long time traversing through jobs that are within the
-- grace period.
local function getJobsInZset(zsetKey, rangeEnd, limit)
if limit > 0 then
return rcall("ZRANGEBYSCORE", zsetKey, 0, rangeEnd, "LIMIT", 0, limit)
else
return rcall("ZRANGEBYSCORE", zsetKey, 0, rangeEnd)
end
end
@@ -0,0 +1,13 @@
--[[
Function to return the next delayed job timestamp.
]]
local function getNextDelayedTimestamp(delayedKey)
local result = rcall("ZRANGE", delayedKey, 0, 0, "WITHSCORES")
if #result then
local nextTimestamp = tonumber(result[2])
if (nextTimestamp ~= nil) then
nextTimestamp = nextTimestamp / 0x1000
end
return nextTimestamp
end
end
+9
View File
@@ -0,0 +1,9 @@
local function getOrSetMaxEvents(metaKey)
local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
if not maxEvents then
maxEvents = 10000
rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
end
return maxEvents
end
+7
View File
@@ -0,0 +1,7 @@
--[[
Function to get priority score.
]]
local function getPriorityScore(priority, priorityCounterKey)
local prioCounter = rcall("INCR", priorityCounterKey)
return priority * 0x100000000 + prioCounter % 0x100000000
end
+14
View File
@@ -0,0 +1,14 @@
local function getRateLimitTTL(maxJobs, rateLimiterKey)
if maxJobs and maxJobs <= tonumber(rcall("GET", rateLimiterKey) or 0) then
local pttl = rcall("PTTL", rateLimiterKey)
if pttl == 0 then
rcall("DEL", rateLimiterKey)
end
if pttl > 0 then
return pttl
end
end
return 0
end
+12
View File
@@ -0,0 +1,12 @@
--[[
Function to check for the meta.paused key to decide if we are paused or not
(since an empty list and !EXISTS are not really the same).
]]
local function getTargetQueueList(queueMetaKey, waitKey, pausedKey)
if rcall("HEXISTS", queueMetaKey, "paused") ~= 1 then
return waitKey, false
else
return pausedKey, true
end
end
+19
View File
@@ -0,0 +1,19 @@
--[[
Function to get the latest saved timestamp.
]]
local function getTimestamp(jobKey, attributes)
if #attributes == 1 then
return rcall("HGET", jobKey, attributes[1])
end
local jobTs
for _, ts in ipairs(rcall("HMGET", jobKey, unpack(attributes))) do
if (ts) then
jobTs = ts
break
end
end
return jobTs
end
+7
View File
@@ -0,0 +1,7 @@
--[[
Function to get ZSet items.
]]
local function getZSetItems(keyName, max)
return rcall('ZRANGE', keyName, 0, max - 1)
end
+33
View File
@@ -0,0 +1,33 @@
--[[
Function to recursively check if there are no locks
on the jobs to be removed.
returns:
boolean
]]
local function isLocked( prefix, jobId, removeChildren)
local jobKey = prefix .. jobId;
-- Check if this job is locked
local lockKey = jobKey .. ':lock'
local lock = rcall("GET", lockKey)
if not lock then
if removeChildren == "1" then
local dependencies = rcall("SMEMBERS", jobKey .. ":dependencies")
if (#dependencies > 0) then
for i, childJobKey in ipairs(dependencies) do
-- We need to get the jobId for this job.
local childJobId = getJobIdFromKey(childJobKey)
local childJobPrefix = getJobKeyPrefix(childJobKey, childJobId)
local result = isLocked( childJobPrefix, childJobId, removeChildren )
if result then
return true
end
end
end
end
return false
end
return true
end
@@ -0,0 +1,13 @@
--[[
Function to move job from prioritized state to active.
]]
local function moveJobFromPriorityToActive(priorityKey, activeKey, priorityCounterKey)
local prioritizedJob = rcall("ZPOPMIN", priorityKey)
if #prioritizedJob > 0 then
rcall("LPUSH", activeKey, prioritizedJob[1])
return prioritizedJob[1]
else
rcall("DEL", priorityCounterKey)
end
end
@@ -0,0 +1,38 @@
--[[
Function to recursively move from waitingChildren to failed.
]]
-- Includes
--- @include "moveParentToWaitIfNeeded"
local function moveParentFromWaitingChildrenToFailed( parentQueueKey, parentKey, parentId, jobIdKey, timestamp)
if rcall("ZREM", parentQueueKey .. ":waiting-children", parentId) == 1 then
rcall("ZADD", parentQueueKey .. ":failed", timestamp, parentId)
local failedReason = "child " .. jobIdKey .. " failed"
rcall("HMSET", parentKey, "failedReason", failedReason, "finishedOn", timestamp)
rcall("XADD", parentQueueKey .. ":events", "*", "event", "failed", "jobId", parentId, "failedReason",
failedReason, "prev", "waiting-children")
local rawParentData = rcall("HGET", parentKey, "parent")
if rawParentData ~= false then
local parentData = cjson.decode(rawParentData)
if parentData['fpof'] then
moveParentFromWaitingChildrenToFailed(
parentData['queueKey'],
parentData['queueKey'] .. ':' .. parentData['id'],
parentData['id'],
parentKey,
timestamp
)
elseif parentData['rdof'] then
local grandParentKey = parentData['queueKey'] .. ':' .. parentData['id']
local grandParentDependenciesSet = grandParentKey .. ":dependencies"
if rcall("SREM", grandParentDependenciesSet, parentKey) == 1 then
moveParentToWaitIfNeeded(parentData['queueKey'], grandParentDependenciesSet,
grandParentKey, parentData['id'], timestamp)
end
end
end
end
end
@@ -0,0 +1,42 @@
--[[
Validate and move parent to active if needed.
]]
-- Includes
--- @include "addDelayMarkerIfNeeded"
--- @include "addJobWithPriority"
--- @include "getTargetQueueList"
local function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey, parentKey, parentId, timestamp)
local isParentActive = rcall("ZSCORE", parentQueueKey .. ":waiting-children", parentId)
if rcall("SCARD", parentDependenciesKey) == 0 and isParentActive then
rcall("ZREM", parentQueueKey .. ":waiting-children", parentId)
local parentWaitKey = parentQueueKey .. ":wait"
local parentTarget, paused = getTargetQueueList(parentQueueKey .. ":meta", parentWaitKey,
parentQueueKey .. ":paused")
local jobAttributes = rcall("HMGET", parentKey, "priority", "delay")
local priority = tonumber(jobAttributes[1]) or 0
local delay = tonumber(jobAttributes[2]) or 0
if delay > 0 then
local delayedTimestamp = tonumber(timestamp) + delay
local score = delayedTimestamp * 0x1000
local parentDelayedKey = parentQueueKey .. ":delayed"
rcall("ZADD", parentDelayedKey, score, parentId)
rcall("XADD", parentQueueKey .. ":events", "*", "event", "delayed", "jobId", parentId,
"delay", delayedTimestamp)
addDelayMarkerIfNeeded(parentTarget, parentDelayedKey)
else
if priority == 0 then
rcall("RPUSH", parentTarget, parentId)
else
addJobWithPriority(parentWaitKey, parentQueueKey .. ":prioritized", priority, paused,
parentId, parentQueueKey .. ":pc")
end
rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting", "jobId", parentId,
"prev", "waiting-children")
end
end
end
@@ -0,0 +1,73 @@
--[[
Function to move job from wait state to active.
Input:
keys[1] wait key
keys[2] active key
keys[3] prioritized key
keys[4] stream events key
keys[5] stalled key
-- Rate limiting
keys[6] rate limiter key
keys[7] delayed key
keys[8] paused key
keys[9] meta key
keys[10] pc priority counter
opts - token - lock token
opts - lockDuration
opts - limiter
]]
-- Includes
--- @include "pushBackJobWithPriority"
local function prepareJobForProcessing(keys, keyPrefix, targetKey, jobId, processedOn,
maxJobs, expireTime, opts)
local jobKey = keyPrefix .. jobId
-- Check if we need to perform rate limiting.
if maxJobs then
local rateLimiterKey = keys[6];
-- check if we exceeded rate limit, we need to remove the job and return expireTime
if expireTime > 0 then
-- remove from active queue and add back to the wait list
rcall("LREM", keys[2], 1, jobId)
local priority = tonumber(rcall("HGET", jobKey, "priority")) or 0
if priority == 0 then
rcall("RPUSH", targetKey, jobId)
else
pushBackJobWithPriority(keys[3], priority, jobId)
end
-- Return when we can process more jobs
return {0, 0, expireTime, 0}
end
local jobCounter = tonumber(rcall("INCR", rateLimiterKey))
if jobCounter == 1 then
local limiterDuration = opts['limiter'] and opts['limiter']['duration']
local integerDuration = math.floor(math.abs(limiterDuration))
rcall("PEXPIRE", rateLimiterKey, integerDuration)
end
end
local lockKey = jobKey .. ':lock'
-- get a lock
if opts['token'] ~= "0" then
rcall("SET", lockKey, opts['token'], "PX", opts['lockDuration'])
end
rcall("XADD", keys[4], "*", "event", "active", "jobId", jobId, "prev", "waiting")
rcall("HSET", jobKey, "processedOn", processedOn)
rcall("HINCRBY", jobKey, "attemptsMade", 1)
return {rcall("HGETALL", jobKey), jobId, 0, 0} -- get job data
end
+44
View File
@@ -0,0 +1,44 @@
--[[
Updates the delay set, by moving delayed jobs that should
be processed now to "wait".
Events:
'waiting'
]]
-- Includes
--- @include "addPriorityMarkerIfNeeded"
--- @include "getPriorityScore"
-- Try to get as much as 1000 jobs at once
local function promoteDelayedJobs(delayedKey, waitKey, targetKey, prioritizedKey,
eventStreamKey, prefix, timestamp, paused, priorityCounterKey)
local jobs = rcall("ZRANGEBYSCORE", delayedKey, 0, (timestamp + 1) * 0x1000, "LIMIT", 0, 1000)
if (#jobs > 0) then
rcall("ZREM", delayedKey, unpack(jobs))
for _, jobId in ipairs(jobs) do
local jobKey = prefix .. jobId
local priority =
tonumber(rcall("HGET", jobKey, "priority")) or 0
if priority == 0 then
-- LIFO or FIFO
rcall("LPUSH", targetKey, jobId)
else
local score = getPriorityScore(priority, priorityCounterKey)
rcall("ZADD", prioritizedKey, score, jobId)
end
-- Emit waiting event
rcall("XADD", eventStreamKey, "*", "event", "waiting", "jobId",
jobId, "prev", "delayed")
rcall("HSET", jobKey, "delay", 0)
end
if not paused then
addPriorityMarkerIfNeeded(targetKey)
end
end
end
@@ -0,0 +1,9 @@
--[[
Function to push back job considering priority in front of same prioritized jobs.
]]
local function pushBackJobWithPriority(prioritizedKey, priority, jobId)
-- in order to put it at front of same prioritized jobs
-- we consider prioritized counter as 0
local score = priority * 0x100000000
rcall("ZADD", prioritizedKey, score, jobId)
end
+13
View File
@@ -0,0 +1,13 @@
--[[
Function to remove job.
]]
-- Includes
--- @include "removeParentDependencyKey"
local function removeJob(jobId, hard, baseKey)
local jobKey = baseKey .. jobId
removeParentDependencyKey(jobKey, hard, nil, baseKey)
rcall("DEL", jobKey, jobKey .. ':logs',
jobKey .. ':dependencies', jobKey .. ':processed')
end
@@ -0,0 +1,35 @@
--[[
Function to remove from any state.
returns:
prev state
]]
local function removeJobFromAnyState( prefix, jobId)
-- We start with the ZSCORE checks, since they have O(1) complexity
if rcall("ZSCORE", prefix .. "completed", jobId) then
rcall("ZREM", prefix .. "completed", jobId)
return "completed"
elseif rcall("ZSCORE", prefix .. "waiting-children", jobId) then
rcall("ZREM", prefix .. "waiting-children", jobId)
return "waiting-children"
elseif rcall("ZSCORE", prefix .. "delayed", jobId) then
rcall("ZREM", prefix .. "delayed", jobId)
return "delayed"
elseif rcall("ZSCORE", prefix .. "failed", jobId) then
rcall("ZREM", prefix .. "failed", jobId)
return "failed"
elseif rcall("ZSCORE", prefix .. "prioritized", jobId) then
rcall("ZREM", prefix .. "prioritized", jobId)
return "prioritized"
-- We remove only 1 element from the list, since we assume they are not added multiple times
elseif rcall("LREM", prefix .. "wait", 1, jobId) == 1 then
return "wait"
elseif rcall("LREM", prefix .. "paused", 1, jobId) == 1 then
return "paused"
elseif rcall("LREM", prefix .. "active", 1, jobId) == 1 then
return "active"
end
return "unknown"
end
+13
View File
@@ -0,0 +1,13 @@
--[[
Functions to remove jobs.
]]
-- Includes
--- @include "removeJob"
local function removeJobs(keys, hard, baseKey, max)
for i, key in ipairs(keys) do
removeJob(key, hard, baseKey)
end
return max - #keys
end
+15
View File
@@ -0,0 +1,15 @@
--[[
Functions to remove jobs by max age.
]]
-- Includes
--- @include "removeJob"
local function removeJobsByMaxAge(timestamp, maxAge, targetSet, prefix)
local start = timestamp - maxAge * 1000
local jobIds = rcall("ZREVRANGEBYSCORE", targetSet, start, "-inf")
for i, jobId in ipairs(jobIds) do
removeJob(jobId, false, prefix)
end
rcall("ZREMRANGEBYSCORE", targetSet, "-inf", start)
end
@@ -0,0 +1,15 @@
--[[
Functions to remove jobs by max count.
]]
-- Includes
--- @include "removeJob"
local function removeJobsByMaxCount(maxCount, targetSet, prefix)
local start = maxCount
local jobIds = rcall("ZREVRANGE", targetSet, start, -1)
for i, jobId in ipairs(jobIds) do
removeJob(jobId, false, prefix)
end
rcall("ZREMRANGEBYRANK", targetSet, 0, -(maxCount + 1))
end
+17
View File
@@ -0,0 +1,17 @@
--[[
Functions to remove jobs.
]]
-- Includes
--- @include "removeJobs"
local function getListItems(keyName, max)
return rcall('LRANGE', keyName, 0, max - 1)
end
local function removeListJobs(keyName, hard, baseKey, max)
local jobs = getListItems(keyName, max)
local count = removeJobs(jobs, hard, baseKey, max)
rcall("LTRIM", keyName, #jobs, -1)
return count
end
@@ -0,0 +1,77 @@
--[[
Check if this job has a parent. If so we will just remove it from
the parent child list, but if it is the last child we should move the parent to "wait/paused"
which requires code from "moveToFinished"
]]
--- @include "destructureJobKey"
--- @include "getTargetQueueList"
local function moveParentToWait(parentPrefix, parentId, emitEvent)
local parentTarget = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "wait", parentPrefix .. "paused")
rcall("RPUSH", parentTarget, parentId)
if emitEvent then
local parentEventStream = parentPrefix .. "events"
rcall("XADD", parentEventStream, "*", "event", "waiting", "jobId", parentId, "prev", "waiting-children")
end
end
local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey)
if parentKey then
local parentDependenciesKey = parentKey .. ":dependencies"
local result = rcall("SREM", parentDependenciesKey, jobKey)
if result > 0 then
local pendingDependencies = rcall("SCARD", parentDependenciesKey)
if pendingDependencies == 0 then
local parentId = getJobIdFromKey(parentKey)
local parentPrefix = getJobKeyPrefix(parentKey, parentId)
local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
if numRemovedElements == 1 then
if hard then
if parentPrefix == baseKey then
removeParentDependencyKey(parentKey, hard, nil, baseKey)
rcall("DEL", parentKey, parentKey .. ':logs',
parentKey .. ':dependencies', parentKey .. ':processed')
else
moveParentToWait(parentPrefix, parentId)
end
else
moveParentToWait(parentPrefix, parentId, true)
end
end
end
end
else
local missedParentKey = rcall("HGET", jobKey, "parentKey")
if( (type(missedParentKey) == "string") and missedParentKey ~= "" and (rcall("EXISTS", missedParentKey) == 1)) then
local parentDependenciesKey = missedParentKey .. ":dependencies"
local result = rcall("SREM", parentDependenciesKey, jobKey)
if result > 0 then
local pendingDependencies = rcall("SCARD", parentDependenciesKey)
if pendingDependencies == 0 then
local parentId = getJobIdFromKey(missedParentKey)
local parentPrefix = getJobKeyPrefix(missedParentKey, parentId)
local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
if numRemovedElements == 1 then
if hard then
if parentPrefix == baseKey then
removeParentDependencyKey(missedParentKey, hard, nil, baseKey)
rcall("DEL", missedParentKey, missedParentKey .. ':logs',
missedParentKey .. ':dependencies', missedParentKey .. ':processed')
else
moveParentToWait(parentPrefix, parentId)
end
else
moveParentToWait(parentPrefix, parentId, true)
end
end
end
end
end
end
end
+15
View File
@@ -0,0 +1,15 @@
-- Includes
--- @include "batches"
--- @include "getZSetItems"
--- @include "removeJobs"
local function removeZSetJobs(keyName, hard, baseKey, max)
local jobs = getZSetItems(keyName, max)
local count = removeJobs(jobs, hard, baseKey, max)
if(#jobs > 0) then
for from, to in batches(#jobs, 7000) do
rcall("ZREM", keyName, unpack(jobs, from, to))
end
end
return count
end
+30
View File
@@ -0,0 +1,30 @@
--[[
Function to store a job
]]
local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp,
parentKey, parentData, repeatJobKey)
local jsonOpts = cjson.encode(opts)
local delay = opts['delay'] or 0
local priority = opts['priority'] or 0
local optionalValues = {}
if parentKey ~= nil then
table.insert(optionalValues, "parentKey")
table.insert(optionalValues, parentKey)
table.insert(optionalValues, "parent")
table.insert(optionalValues, parentData)
end
if repeatJobKey ~= nil then
table.insert(optionalValues, "rjk")
table.insert(optionalValues, repeatJobKey)
end
rcall("HMSET", jobIdKey, "name", name, "data", data, "opts", jsonOpts,
"timestamp", timestamp, "delay", delay, "priority", priority,
unpack(optionalValues))
rcall("XADD", eventsKey, "*", "event", "added", "jobId", jobId, "name", name)
return delay, priority
end
+12
View File
@@ -0,0 +1,12 @@
--[[
Function to trim events, default 10000.
]]
local function trimEvents(metaKey, eventStreamKey)
local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
if maxEvents ~= false then
rcall("XTRIM", eventStreamKey, "MAXLEN", "~", maxEvents)
else
rcall("XTRIM", eventStreamKey, "MAXLEN", "~", 10000)
end
end
@@ -0,0 +1,25 @@
--- @include "updateParentDepsIfNeeded"
--[[
This function is used to update the parent's dependencies if the job
is already completed and about to be ignored. The parent must get its
dependencies updated to avoid the parent job being stuck forever in
the waiting-children state.
]]
local function updateExistingJobsParent(parentKey, parent, parentData,
parentDependenciesKey, completedKey,
jobIdKey, jobId, timestamp)
if parentKey ~= nil then
if rcall("ZSCORE", completedKey, jobId) ~= false then
local returnvalue = rcall("HGET", jobIdKey, "returnvalue")
updateParentDepsIfNeeded(parentKey, parent['queueKey'],
parentDependenciesKey, parent['id'],
jobIdKey, returnvalue, timestamp)
else
if parentDependenciesKey ~= nil then
rcall("SADD", parentDependenciesKey, jobIdKey)
end
end
rcall("HMSET", jobIdKey, "parentKey", parentKey, "parent", parentData)
end
end
@@ -0,0 +1,13 @@
--[[
Validate and move or add dependencies to parent.
]]
-- Includes
--- @include "moveParentToWaitIfNeeded"
local function updateParentDepsIfNeeded(parentKey, parentQueueKey, parentDependenciesKey,
parentId, jobIdKey, returnvalue, timestamp )
local processedSet = parentKey .. ":processed"
rcall("HSET", processedSet, jobIdKey, returnvalue)
moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey, parentKey, parentId, timestamp)
end