mirror of
https://github.com/moebooru/moebooru
synced 2025-08-22 01:47:48 +00:00
branch : moe extra : convert_revision : svn%3A2d28d66d-8d94-df11-8c86-00306ef368cb/trunk/moe%405
191 lines
5.5 KiB
Ruby
191 lines
5.5 KiB
Ruby
class JobTask < ActiveRecord::Base
|
|
TASK_TYPES = %w(mass_tag_edit approve_tag_alias approve_tag_implication calculate_favorite_tags upload_posts_to_mirrors periodic_maintenance)
|
|
STATUSES = %w(pending processing finished error)
|
|
|
|
validates_inclusion_of :task_type, :in => TASK_TYPES
|
|
validates_inclusion_of :status, :in => STATUSES
|
|
|
|
def data
|
|
JSON.parse(data_as_json)
|
|
end
|
|
|
|
def data=(hoge)
|
|
self.data_as_json = hoge.to_json
|
|
end
|
|
|
|
def execute!
|
|
if repeat_count > 0
|
|
count = repeat_count - 1
|
|
else
|
|
count = repeat_count
|
|
end
|
|
|
|
begin
|
|
execute_sql("SET statement_timeout = 0")
|
|
update_attributes(:status => "processing")
|
|
__send__("execute_#{task_type}")
|
|
|
|
if count == 0
|
|
update_attributes(:status => "finished")
|
|
else
|
|
update_attributes(:status => "pending", :repeat_count => count)
|
|
end
|
|
rescue SystemExit => x
|
|
update_attributes(:status => "pending")
|
|
raise x
|
|
rescue Exception => x
|
|
update_attributes(:status => "error", :status_message => "#{x.class}: #{x}")
|
|
end
|
|
end
|
|
|
|
def execute_mass_tag_edit
|
|
start_tags = data["start_tags"]
|
|
result_tags = data["result_tags"]
|
|
updater_id = data["updater_id"]
|
|
updater_ip_addr = data["updater_ip_addr"]
|
|
Tag.mass_edit(start_tags, result_tags, updater_id, updater_ip_addr)
|
|
end
|
|
|
|
def execute_approve_tag_alias
|
|
ta = TagAlias.find(data["id"])
|
|
updater_id = data["updater_id"]
|
|
updater_ip_addr = data["updater_ip_addr"]
|
|
ta.approve(updater_id, updater_ip_addr)
|
|
end
|
|
|
|
def execute_approve_tag_implication
|
|
ti = TagImplication.find(data["id"])
|
|
updater_id = data["updater_id"]
|
|
updater_ip_addr = data["updater_ip_addr"]
|
|
ti.approve(updater_id, updater_ip_addr)
|
|
end
|
|
|
|
def execute_calculate_favorite_tags
|
|
return if Cache.get("delay-favtags-calc")
|
|
|
|
last_processed_post_id = data["last_processed_post_id"].to_i
|
|
|
|
if last_processed_post_id == 0
|
|
last_processed_post_id = Post.maximum("id").to_i
|
|
end
|
|
|
|
Cache.put("delay-favtags-calc", "1", 10.minutes)
|
|
FavoriteTag.process_all(last_processed_post_id)
|
|
update_attributes(:data => {"last_processed_post_id" => Post.maximum("id")})
|
|
end
|
|
|
|
def update_data(*args)
|
|
hash = data.merge(args[0])
|
|
update_attributes(:data => hash)
|
|
end
|
|
|
|
def execute_periodic_maintenance
|
|
return if data["next_run"] && data["next_run"] > Time.now.to_i
|
|
|
|
update_data("step" => "recalculating post count")
|
|
Post.recalculate_row_count
|
|
update_data("step" => "recalculating tag post counts")
|
|
Tag.recalculate_post_count
|
|
update_data("step" => "purging old tags")
|
|
Tag.purge_tags
|
|
|
|
update_data("next_run" => Time.now.to_i + 60*60*6, "step" => nil)
|
|
end
|
|
|
|
def execute_upload_posts_to_mirrors
|
|
# This is a little counterintuitive: if we're backlogged, mirror newer posts first,
|
|
# since they're the ones that receive the most attention. Mirror held posts after
|
|
# unheld posts.
|
|
#
|
|
# Apply a limit, so if we're backlogged heavily, we'll only upload a few posts and
|
|
# then give other jobs a chance to run.
|
|
data = {}
|
|
(1..10).each do
|
|
post = Post.find(:first, :conditions => ["NOT is_warehoused AND status <> 'deleted'"], :order => "is_held ASC, index_timestamp DESC")
|
|
break if not post
|
|
|
|
data["left"] = Post.count(:conditions => ["NOT is_warehoused AND status <> 'deleted'"])
|
|
data["post_id"] = post.id
|
|
update_attributes(:data => data)
|
|
|
|
begin
|
|
post.upload_to_mirrors
|
|
ensure
|
|
data["post_id"] = nil
|
|
update_attributes(:data => data)
|
|
end
|
|
|
|
data["left"] = Post.count(:conditions => ["NOT is_warehoused AND status <> 'deleted'"])
|
|
update_attributes(:data => data)
|
|
end
|
|
end
|
|
|
|
def pretty_data
|
|
case task_type
|
|
when "mass_tag_edit"
|
|
start = data["start_tags"]
|
|
result = data["result_tags"]
|
|
user = User.find_name(data["updater_id"])
|
|
|
|
"start:#{start} result:#{result} user:#{user}"
|
|
|
|
when "approve_tag_alias"
|
|
ta = TagAlias.find(data["id"])
|
|
"start:#{ta.name} result:#{ta.alias_name}"
|
|
|
|
when "approve_tag_implication"
|
|
ti = TagImplication.find(data["id"])
|
|
"start:#{ti.predicate.name} result:#{ti.consequent.name}"
|
|
|
|
when "calculate_favorite_tags"
|
|
"post_id:#{data['last_processed_post_id']}"
|
|
|
|
when "upload_posts_to_mirrors"
|
|
ret = ""
|
|
if data["post_id"]
|
|
ret << "uploading post_id #{data["post_id"]}"
|
|
elsif data["left"]
|
|
ret << "sleeping"
|
|
else
|
|
ret << "idle"
|
|
end
|
|
ret << (" (%i left) " % data["left"]) if data["left"]
|
|
ret
|
|
|
|
when "periodic_maintenance"
|
|
if status == "processing" then
|
|
data["step"]
|
|
elsif status != "error" then
|
|
next_run = (data["next_run"] or 0) - Time.now.to_i
|
|
next_run_in_minutes = next_run.to_i / 60
|
|
if next_run_in_minutes > 0
|
|
eta = "next run in #{(next_run_in_minutes.to_f / 60.0).round} hours"
|
|
else
|
|
eta = "next run imminent"
|
|
end
|
|
"sleeping (#{eta})"
|
|
end
|
|
end
|
|
end
|
|
|
|
def self.execute_once
|
|
find(:all, :conditions => ["status = ?", "pending"], :order => "id desc").each do |task|
|
|
task.execute!
|
|
sleep 1
|
|
end
|
|
end
|
|
|
|
def self.execute_all
|
|
# If we were interrupted without finishing a task, it may be left in processing; reset
|
|
# thos tasks to pending.
|
|
find(:all, :conditions => ["status = ?", "processing"]).each do |task|
|
|
task.update_attributes(:status => "pending")
|
|
end
|
|
|
|
while true
|
|
execute_once
|
|
sleep 10
|
|
end
|
|
end
|
|
end
|