Skip to content
This repository was archived by the owner on Aug 8, 2019. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,14 @@ RQ Queue Config JSON

A typical config:

config.json
``` json
{"name":"relay","script":"./code/relay_script.rb","num_workers":1,"exec_prefix":""}
```
form.json
``` json
{"default":"hidden","mesg_param1":{"label":"Something Here","help":"foobar"}}
```


** Mandatory Fields **
Expand Down
24 changes: 11 additions & 13 deletions code/adminoper.rb
Original file line number Diff line number Diff line change
@@ -1,49 +1,47 @@
module RQ
class AdminOper

attr_accessor :admin_status
attr_accessor :oper_status

def initialize(pathname)
@pathname = pathname
@dirname = File.dirname(pathname)
@filename = File.basename(pathname)
raise ArgumentError, "#{@dirname} doesn't exist" unless File.directory? @dirname
fail ArgumentError, "#{@dirname} doesn't exist" unless File.directory? @dirname

@down_name = @dirname + "/" + @filename + ".down"
@pause_name = @dirname + "/" + @filename + ".pause"
@down_name = @dirname + '/' + @filename + '.down'
@pause_name = @dirname + '/' + @filename + '.pause'

@admin_status = "UNKNOWN"
@oper_status = "UNKNOWN"
@daemon_status = "UP"
@admin_status = 'UNKNOWN'
@oper_status = 'UNKNOWN'
@daemon_status = 'UP'
end

def update!
if File.exists?(@down_name)
@admin_status = @oper_status = "DOWN"
@admin_status = @oper_status = 'DOWN'
elsif File.exists?(@pause_name)
@admin_status = @oper_status = "PAUSE"
@admin_status = @oper_status = 'PAUSE'
else
@admin_status = @oper_status = "UP"
@admin_status = @oper_status = 'UP'
end
update_status
end

# What the administrator cannot set, only daemons should set this
# What the administrator cannot set, only daemons should set this
def set_daemon_status(stat)
@daemon_status = stat

update_status
end

def update_status
if @daemon_status == "UP"
if @daemon_status == 'UP'
@oper_status = @admin_status
else
@oper_status = @daemon_status
end
end
private :update_status

end
end
64 changes: 31 additions & 33 deletions code/cleaner_script.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env ruby
$:.unshift(File.join(File.dirname(__FILE__), ".."))
$LOAD_PATH.unshift(File.join(File.dirname(__FILE__), '..'))

require 'vendor/environment'
require 'fileutils'
Expand All @@ -23,8 +23,8 @@ def handle_fail(mesg = 'soft fail')
count = ENV['RQ_COUNT'].to_i

if count > 15
write_status('run', "RQ_COUNT > 15 - failing")
write_status('fail', "RQ_COUNT > 15 - failing")
write_status('run', 'RQ_COUNT > 15 - failing')
write_status('fail', 'RQ_COUNT > 15 - failing')
exit(0)
end

Expand All @@ -43,13 +43,13 @@ def fail_hard(mesg)
################################################################################

def listq(basedir)
queues = Dir.glob(basedir + "/queue/??*")
queues = Dir.glob(basedir + '/queue/??*')
queues
end

def rm_logs_older_than(qname, regex, hours)
Dir.glob(qname + "/#{regex}").each do |f|
if (Time.now-File.mtime(f))/3600 > hours
if (Time.now - File.mtime(f)) / 3600 > hours
begin
puts "status: removing #{f}"
STDOUT.flush
Expand All @@ -65,35 +65,35 @@ def rm_logs_older_than(qname, regex, hours)

def mv_logs(qname)
if File.exists?("#{qname}/queue.log")
a=Time.now
b = sprintf("%s%.2d%.2d.%.2d:%.2d" ,a.year, a.month, a.day, a.hour, a.min)
a = Time.now
b = sprintf('%s%.2d%.2d.%.2d:%.2d' , a.year, a.month, a.day, a.hour, a.min)
puts "status: moving #{qname}/queue.log"
STDOUT.flush
FileUtils.mv("#{qname}/queue.log", "#{qname}/queue.log.#{b}")
end
end

def remove_old(qname, days)
clean_queues = ["/done", "/relayed", "/prep", "/queue"]
clean_queues = ['/done', '/relayed', '/prep', '/queue']
clean_queues.each do |cq|
if File.exists?(qname + cq)

# go by directories and remove any day dir > days + 1
# then go into the hour dirs and remove by time
# easier to whack a whole higher level dir then stat everything below it
Dir.glob(qname + cq + "/????????").each do |x|
if Date.today - Date.strptime(File.basename(x), "%Y%m%d") >= days + 1
puts "status: removing " + x

Dir.glob(qname + cq + '/????????').each do |x|
if Date.today - Date.strptime(File.basename(x), '%Y%m%d') >= days + 1
puts 'status: removing ' + x
STDOUT.flush
FileUtils.rm_rf(x)
elsif Date.today - Date.strptime(File.basename(x), "%Y%m%d") == days
Dir.glob(qname + cq + "/????????/??").each do |y|
elsif Date.today - Date.strptime(File.basename(x), '%Y%m%d') == days
Dir.glob(qname + cq + '/????????/??').each do |y|
if y =~ /(\d{8})\/(\d{2})$/
timstr = $1 + "."+ $2 + ":00:00"
j= DateTime.now - DateTime.strptime(timstr, "%Y%m%d.%H:%M:%S")
timstr = Regexp.last_match[1] + '.' + Regexp.last_match[2] + ':00:00'
j = DateTime.now - DateTime.strptime(timstr, '%Y%m%d.%H:%M:%S')
if j.to_i >= days
puts "status: removing " + y
puts 'status: removing ' + y
STDOUT.flush
FileUtils.rm_rf(y)
end
Expand All @@ -103,14 +103,13 @@ def remove_old(qname, days)
end
end
end

end

def trim_relay(qpath, num)
puts "Trimming Relay to #{num} entries"
STDOUT.flush

all_msgs = RQ::HashDir.entries(qpath + "/relayed")
all_msgs = RQ::HashDir.entries(qpath + '/relayed')

msgs = all_msgs[num..-1]

Expand All @@ -123,32 +122,31 @@ def trim_relay(qpath, num)
msgs.each do
|ent|

path = RQ::HashDir.path_for(qpath + "/relayed", ent)
path = RQ::HashDir.path_for(qpath + '/relayed', ent)

# TODO: put progress
#puts "status: removing " + path
#STDOUT.flush
# puts "status: removing " + path
# STDOUT.flush
FileUtils.rm_rf(path)
end

puts "status: removed #{msgs.length} entries from relayed"
STDOUT.flush
end


##################################################################
# My MAIN
##################################################################
basedir = "/rq/current"
basedir = '/rq/current'

if not ENV.has_key?("RQ_PARAM1")
fail_hard("need to specify a PARAM1")
if not ENV.key?('RQ_PARAM1')
fail_hard('need to specify a PARAM1')
end

if ENV['RQ_PARAM1'] == "ALLQUEUES"
if ENV['RQ_PARAM1'] == 'ALLQUEUES'
queues = listq(basedir)
else
queues = [basedir + "/queue/" + ENV['RQ_PARAM1']]
queues = [basedir + '/queue/' + ENV['RQ_PARAM1']]
if not File.exists?queues[0]
fail_hard("the specified queue #{queues} does not exist")
end
Expand All @@ -161,11 +159,11 @@ def trim_relay(qpath, num)
STDOUT.flush
end
queues.each do |q|
rm_logs_older_than(q, "/queue.log.?*", log_days*24)
rm_logs_older_than(q, '/queue.log.?*', log_days * 24)
mv_logs(q)
remove_old(q, log_days)
end

trim_relay(basedir + "/queue/relay", 60000)
trim_relay(basedir + '/queue/relay', 60_000)

write_status('done', "successfully ran this script")
write_status('done', 'successfully ran this script')
29 changes: 13 additions & 16 deletions code/hashdir.rb
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
module RQ
class HashDir

# For now, the system is not configurable in terms of pattern match or depth

def self.make(path)
FileUtils.mkdir_p(path)
return true
true
end

def self.exist(path, msg_id)
parts = self.msg_id_parts(msg_id)
parts = msg_id_parts(msg_id)
# parts = [ "YYYYmmDD", "HH", "MM" ]

# If we got bad data, return false
Expand All @@ -21,29 +20,29 @@ def self.exist(path, msg_id)
# Do a DFT traverse in reverse order so most
# recent is first
def self.entries(path, limit = nil)
self.entries_int(path, 0, [], limit)
entries_int(path, 0, [], limit)
end

def self.entries_int(path, level, accum, limit = nil)
if level == 0
# YYYYMMDD
ents1 = Dir.glob("#{path}/[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]")
ents1.sort.reverse.each do |e|
self.entries_int(e, 1, accum, limit)
entries_int(e, 1, accum, limit)
break if limit && accum.length == limit
end
elsif level == 1
# HH
ents1 = Dir.glob("#{path}/[0-9][0-9]")
ents1.sort.reverse.each do |e|
self.entries_int(e, 2, accum, limit)
entries_int(e, 2, accum, limit)
break if limit && accum.length == limit
end
elsif level == 2
# MM
ents1 = Dir.glob("#{path}/[0-9][0-9]")
ents1.sort.reverse.each do |e|
self.entries_int(e, 3, accum, limit)
entries_int(e, 3, accum, limit)
break if limit && accum.length == limit
end
elsif level == 3
Expand All @@ -65,7 +64,7 @@ def self.entries_old(path, limit = nil)
end

def self.num_entries(path)
self.num_entries_int(path, 0)
num_entries_int(path, 0)
end

def self.num_entries_int(path, level)
Expand All @@ -74,19 +73,19 @@ def self.num_entries_int(path, level)
# YYYYMMDD
ents1 = Dir.glob("#{path}/[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]")
ents1.sort.reverse.each do |e|
sum += self.num_entries_int(e, 1)
sum += num_entries_int(e, 1)
end
elsif level == 1
# HH
ents1 = Dir.glob("#{path}/[0-9][0-9]")
ents1.sort.reverse.each do |e|
sum += self.num_entries_int(e, 2)
sum += num_entries_int(e, 2)
end
elsif level == 2
# MM
ents1 = Dir.glob("#{path}/[0-9][0-9]")
ents1.sort.reverse.each do |e|
sum += self.num_entries_int(e, 3)
sum += num_entries_int(e, 3)
end
elsif level == 3
# MESG-ID
Expand All @@ -102,27 +101,25 @@ def self.num_entries_old(path)
ents1.length
end


def self.inject(prev_msg_path, new_base_path, msg_id)
parts = self.msg_id_parts(msg_id)
parts = msg_id_parts(msg_id)
FileUtils.mkdir_p("#{new_base_path}/#{parts[0]}/#{parts[1]}/#{parts[2]}")
newname = "#{new_base_path}/#{parts[0]}/#{parts[1]}/#{parts[2]}/#{msg_id}"
File.rename(prev_msg_path, newname)
end

def self.path_for(que_base_path, msg_id)
parts = self.msg_id_parts(msg_id)
parts = msg_id_parts(msg_id)
"#{que_base_path}/#{parts[0]}/#{parts[1]}/#{parts[2]}/#{msg_id}"
end

def self.msg_id_parts(msg_id)
# Ex. msg_id 20100625.0127.35.122.7509656
if msg_id =~ /(\d\d\d\d\d\d\d\d)\.(\d\d)(\d\d)/
[$1, $2, $3]
[Regexp.last_match[1], Regexp.last_match[2], Regexp.last_match[3]]
else
nil
end
end

end
end
Loading