From f3c7409d7216d18ca510dca9fce3af927df0f1b1 Mon Sep 17 00:00:00 2001 From: Omar Roth Date: Tue, 7 Aug 2018 20:25:59 -0500 Subject: [PATCH] Update refresh_channels to properly utilize workers --- src/invidious.cr | 10 +--------- src/invidious/jobs.cr | 45 ++++++++++++++++++++++++++++++------------- 2 files changed, 33 insertions(+), 22 deletions(-) diff --git a/src/invidious.cr b/src/invidious.cr index 9b54629f7..ffac2717b 100644 --- a/src/invidious.cr +++ b/src/invidious.cr @@ -83,15 +83,7 @@ crawl_threads.times do end end -total_channels = PG_DB.query_one("SELECT count(*) FROM channels", as: Int64) -channel_threads.times do |i| - limit = total_channels / channel_threads - offset = limit.not_nil! * i - - spawn do - refresh_channels(PG_DB, limit, offset) - end -end +refresh_channels(PG_DB, channel_threads) video_threads.times do |i| spawn do diff --git a/src/invidious/jobs.cr b/src/invidious/jobs.cr index dc44b0290..7d45d7cef 100644 --- a/src/invidious/jobs.cr +++ b/src/invidious/jobs.cr @@ -44,25 +44,44 @@ def crawl_videos(db) end end -def refresh_channels(db, limit = 0, offset = 0) - loop do - db.query("SELECT id FROM channels ORDER BY updated limit $1 offset $2", limit, offset) do |rs| - rs.each do - client = make_client(YT_URL) +def refresh_channels(db, max_threads = 1) + max_channel = Channel(Int32).new - begin + spawn do + max_threads = max_channel.receive + active_threads = 0 + active_channel = Channel(Bool).new + + loop do + db.query("SELECT id FROM channels ORDER BY updated") do |rs| + rs.each do id = rs.read(String) - channel = fetch_channel(id, client, db, false) - db.exec("UPDATE channels SET updated = $1 WHERE id = $2", Time.now, id) - rescue ex - STDOUT << id << " : " << ex.message << "\n" - next + + if active_threads >= max_threads + if active_channel.receive + active_threads -= 1 + end + end + + active_threads += 1 + spawn do + begin + client = make_client(YT_URL) + channel = fetch_channel(id, client, db, false) + + db.exec("UPDATE channels SET updated = $1 WHERE id = $2", Time.now, id) + rescue ex + STDOUT << id << " : " << ex.message << "\n" + end + + active_channel.send(true) + end end end end - - Fiber.yield end + + max_channel.send(max_threads) end def refresh_videos(db)