mirror of
				https://github.com/iv-org/invidious.git
				synced 2025-11-03 22:21:55 +00:00 
			
		
		
		
	[refactor] Finish converting jobs to new job setup (#1420)
[refactor] Finish converting jobs to new job setup
This commit is contained in:
		@@ -160,90 +160,29 @@ end
 | 
			
		||||
# Start jobs
 | 
			
		||||
 | 
			
		||||
Invidious::Jobs.register Invidious::Jobs::RefreshChannelsJob.new(PG_DB, logger, config)
 | 
			
		||||
refresh_feeds(PG_DB, logger, config)
 | 
			
		||||
subscribe_to_feeds(PG_DB, logger, HMAC_KEY, config)
 | 
			
		||||
Invidious::Jobs.register Invidious::Jobs::RefreshFeedsJob.new(PG_DB, logger, config)
 | 
			
		||||
Invidious::Jobs.register Invidious::Jobs::SubscribeToFeedsJob.new(PG_DB, logger, config, HMAC_KEY)
 | 
			
		||||
Invidious::Jobs.register Invidious::Jobs::PullPopularVideosJob.new(PG_DB)
 | 
			
		||||
Invidious::Jobs.register Invidious::Jobs::UpdateDecryptFunctionJob.new
 | 
			
		||||
 | 
			
		||||
statistics = {
 | 
			
		||||
  "error" => "Statistics are not availabile.",
 | 
			
		||||
}
 | 
			
		||||
if config.statistics_enabled
 | 
			
		||||
  spawn do
 | 
			
		||||
    statistics = {
 | 
			
		||||
      "version"           => "2.0",
 | 
			
		||||
      "software"          => SOFTWARE,
 | 
			
		||||
      "openRegistrations" => config.registration_enabled,
 | 
			
		||||
      "usage"             => {
 | 
			
		||||
        "users" => {
 | 
			
		||||
          "total"          => PG_DB.query_one("SELECT count(*) FROM users", as: Int64),
 | 
			
		||||
          "activeHalfyear" => PG_DB.query_one("SELECT count(*) FROM users WHERE CURRENT_TIMESTAMP - updated < '6 months'", as: Int64),
 | 
			
		||||
          "activeMonth"    => PG_DB.query_one("SELECT count(*) FROM users WHERE CURRENT_TIMESTAMP - updated < '1 month'", as: Int64),
 | 
			
		||||
        },
 | 
			
		||||
      },
 | 
			
		||||
      "metadata" => {
 | 
			
		||||
        "updatedAt"              => Time.utc.to_unix,
 | 
			
		||||
        "lastChannelRefreshedAt" => PG_DB.query_one?("SELECT updated FROM channels ORDER BY updated DESC LIMIT 1", as: Time).try &.to_unix || 0_i64,
 | 
			
		||||
      },
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    loop do
 | 
			
		||||
      sleep 1.minute
 | 
			
		||||
      Fiber.yield
 | 
			
		||||
 | 
			
		||||
      statistics["usage"].as(Hash)["users"].as(Hash)["total"] = PG_DB.query_one("SELECT count(*) FROM users", as: Int64)
 | 
			
		||||
      statistics["usage"].as(Hash)["users"].as(Hash)["activeHalfyear"] = PG_DB.query_one("SELECT count(*) FROM users WHERE CURRENT_TIMESTAMP - updated < '6 months'", as: Int64)
 | 
			
		||||
      statistics["usage"].as(Hash)["users"].as(Hash)["activeMonth"] = PG_DB.query_one("SELECT count(*) FROM users WHERE CURRENT_TIMESTAMP - updated < '1 month'", as: Int64)
 | 
			
		||||
      statistics["metadata"].as(Hash(String, Int64))["updatedAt"] = Time.utc.to_unix
 | 
			
		||||
      statistics["metadata"].as(Hash(String, Int64))["lastChannelRefreshedAt"] = PG_DB.query_one?("SELECT updated FROM channels ORDER BY updated DESC LIMIT 1", as: Time).try &.to_unix || 0_i64
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
  Invidious::Jobs.register Invidious::Jobs::StatisticsRefreshJob.new(PG_DB, config, SOFTWARE)
 | 
			
		||||
end
 | 
			
		||||
 | 
			
		||||
Invidious::Jobs.register Invidious::Jobs::PullPopularVideosJob.new(PG_DB)
 | 
			
		||||
if config.captcha_key
 | 
			
		||||
  Invidious::Jobs.register Invidious::Jobs::BypassCaptchaJob.new(logger, config)
 | 
			
		||||
end
 | 
			
		||||
 | 
			
		||||
connection_channel = Channel({Bool, Channel(PQ::Notification)}).new(32)
 | 
			
		||||
Invidious::Jobs.register Invidious::Jobs::NotificationJob.new(connection_channel, PG_URL)
 | 
			
		||||
 | 
			
		||||
Invidious::Jobs.start_all
 | 
			
		||||
 | 
			
		||||
def popular_videos
 | 
			
		||||
  Invidious::Jobs::PullPopularVideosJob::POPULAR_VIDEOS.get
 | 
			
		||||
end
 | 
			
		||||
 | 
			
		||||
DECRYPT_FUNCTION = [] of {SigProc, Int32}
 | 
			
		||||
spawn do
 | 
			
		||||
  update_decrypt_function do |function|
 | 
			
		||||
    DECRYPT_FUNCTION.clear
 | 
			
		||||
    function.each { |i| DECRYPT_FUNCTION << i }
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
 | 
			
		||||
if CONFIG.captcha_key
 | 
			
		||||
  spawn do
 | 
			
		||||
    bypass_captcha(CONFIG.captcha_key, logger) do |cookies|
 | 
			
		||||
      cookies.each do |cookie|
 | 
			
		||||
        config.cookies << cookie
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      # Persist cookies between runs
 | 
			
		||||
      CONFIG.cookies = config.cookies
 | 
			
		||||
      File.write("config/config.yml", config.to_yaml)
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
 | 
			
		||||
connection_channel = Channel({Bool, Channel(PQ::Notification)}).new(32)
 | 
			
		||||
spawn do
 | 
			
		||||
  connections = [] of Channel(PQ::Notification)
 | 
			
		||||
 | 
			
		||||
  PG.connect_listen(PG_URL, "notifications") { |event| connections.each { |connection| connection.send(event) } }
 | 
			
		||||
 | 
			
		||||
  loop do
 | 
			
		||||
    action, connection = connection_channel.receive
 | 
			
		||||
 | 
			
		||||
    case action
 | 
			
		||||
    when true
 | 
			
		||||
      connections << connection
 | 
			
		||||
    when false
 | 
			
		||||
      connections.delete(connection)
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
DECRYPT_FUNCTION = Invidious::Jobs::UpdateDecryptFunctionJob::DECRYPT_FUNCTION
 | 
			
		||||
 | 
			
		||||
before_all do |env|
 | 
			
		||||
  preferences = begin
 | 
			
		||||
@@ -3658,12 +3597,7 @@ get "/api/v1/stats" do |env|
 | 
			
		||||
    next error_message
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  if statistics["error"]?
 | 
			
		||||
    env.response.status_code = 500
 | 
			
		||||
    next statistics.to_json
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  statistics.to_json
 | 
			
		||||
  Invidious::Jobs::StatisticsRefreshJob::STATISTICS.to_json
 | 
			
		||||
end
 | 
			
		||||
 | 
			
		||||
# YouTube provides "storyboards", which are sprites containing x * y
 | 
			
		||||
 
 | 
			
		||||
@@ -1,286 +0,0 @@
 | 
			
		||||
def refresh_feeds(db, logger, config)
 | 
			
		||||
  max_channel = Channel(Int32).new
 | 
			
		||||
  spawn do
 | 
			
		||||
    max_threads = max_channel.receive
 | 
			
		||||
    active_threads = 0
 | 
			
		||||
    active_channel = Channel(Bool).new
 | 
			
		||||
 | 
			
		||||
    loop do
 | 
			
		||||
      db.query("SELECT email FROM users WHERE feed_needs_update = true OR feed_needs_update IS NULL") do |rs|
 | 
			
		||||
        rs.each do
 | 
			
		||||
          email = rs.read(String)
 | 
			
		||||
          view_name = "subscriptions_#{sha256(email)}"
 | 
			
		||||
 | 
			
		||||
          if active_threads >= max_threads
 | 
			
		||||
            if active_channel.receive
 | 
			
		||||
              active_threads -= 1
 | 
			
		||||
            end
 | 
			
		||||
          end
 | 
			
		||||
 | 
			
		||||
          active_threads += 1
 | 
			
		||||
          spawn do
 | 
			
		||||
            begin
 | 
			
		||||
              # Drop outdated views
 | 
			
		||||
              column_array = get_column_array(db, view_name)
 | 
			
		||||
              ChannelVideo.type_array.each_with_index do |name, i|
 | 
			
		||||
                if name != column_array[i]?
 | 
			
		||||
                  logger.puts("DROP MATERIALIZED VIEW #{view_name}")
 | 
			
		||||
                  db.exec("DROP MATERIALIZED VIEW #{view_name}")
 | 
			
		||||
                  raise "view does not exist"
 | 
			
		||||
                end
 | 
			
		||||
              end
 | 
			
		||||
 | 
			
		||||
              if !db.query_one("SELECT pg_get_viewdef('#{view_name}')", as: String).includes? "WHERE ((cv.ucid = ANY (u.subscriptions))"
 | 
			
		||||
                logger.puts("Materialized view #{view_name} is out-of-date, recreating...")
 | 
			
		||||
                db.exec("DROP MATERIALIZED VIEW #{view_name}")
 | 
			
		||||
              end
 | 
			
		||||
 | 
			
		||||
              db.exec("REFRESH MATERIALIZED VIEW #{view_name}")
 | 
			
		||||
              db.exec("UPDATE users SET feed_needs_update = false WHERE email = $1", email)
 | 
			
		||||
            rescue ex
 | 
			
		||||
              # Rename old views
 | 
			
		||||
              begin
 | 
			
		||||
                legacy_view_name = "subscriptions_#{sha256(email)[0..7]}"
 | 
			
		||||
 | 
			
		||||
                db.exec("SELECT * FROM #{legacy_view_name} LIMIT 0")
 | 
			
		||||
                logger.puts("RENAME MATERIALIZED VIEW #{legacy_view_name}")
 | 
			
		||||
                db.exec("ALTER MATERIALIZED VIEW #{legacy_view_name} RENAME TO #{view_name}")
 | 
			
		||||
              rescue ex
 | 
			
		||||
                begin
 | 
			
		||||
                  # While iterating through, we may have an email stored from a deleted account
 | 
			
		||||
                  if db.query_one?("SELECT true FROM users WHERE email = $1", email, as: Bool)
 | 
			
		||||
                    logger.puts("CREATE #{view_name}")
 | 
			
		||||
                    db.exec("CREATE MATERIALIZED VIEW #{view_name} AS #{MATERIALIZED_VIEW_SQL.call(email)}")
 | 
			
		||||
                    db.exec("UPDATE users SET feed_needs_update = false WHERE email = $1", email)
 | 
			
		||||
                  end
 | 
			
		||||
                rescue ex
 | 
			
		||||
                  logger.puts("REFRESH #{email} : #{ex.message}")
 | 
			
		||||
                end
 | 
			
		||||
              end
 | 
			
		||||
            end
 | 
			
		||||
 | 
			
		||||
            active_channel.send(true)
 | 
			
		||||
          end
 | 
			
		||||
        end
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      sleep 5.seconds
 | 
			
		||||
      Fiber.yield
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  max_channel.send(config.feed_threads)
 | 
			
		||||
end
 | 
			
		||||
 | 
			
		||||
def subscribe_to_feeds(db, logger, key, config)
 | 
			
		||||
  if config.use_pubsub_feeds
 | 
			
		||||
    case config.use_pubsub_feeds
 | 
			
		||||
    when Bool
 | 
			
		||||
      max_threads = config.use_pubsub_feeds.as(Bool).to_unsafe
 | 
			
		||||
    when Int32
 | 
			
		||||
      max_threads = config.use_pubsub_feeds.as(Int32)
 | 
			
		||||
    end
 | 
			
		||||
    max_channel = Channel(Int32).new
 | 
			
		||||
 | 
			
		||||
    spawn do
 | 
			
		||||
      max_threads = max_channel.receive
 | 
			
		||||
      active_threads = 0
 | 
			
		||||
      active_channel = Channel(Bool).new
 | 
			
		||||
 | 
			
		||||
      loop do
 | 
			
		||||
        db.query_all("SELECT id FROM channels WHERE CURRENT_TIMESTAMP - subscribed > interval '4 days' OR subscribed IS NULL") do |rs|
 | 
			
		||||
          rs.each do
 | 
			
		||||
            ucid = rs.read(String)
 | 
			
		||||
 | 
			
		||||
            if active_threads >= max_threads.as(Int32)
 | 
			
		||||
              if active_channel.receive
 | 
			
		||||
                active_threads -= 1
 | 
			
		||||
              end
 | 
			
		||||
            end
 | 
			
		||||
 | 
			
		||||
            active_threads += 1
 | 
			
		||||
 | 
			
		||||
            spawn do
 | 
			
		||||
              begin
 | 
			
		||||
                response = subscribe_pubsub(ucid, key, config)
 | 
			
		||||
 | 
			
		||||
                if response.status_code >= 400
 | 
			
		||||
                  logger.puts("#{ucid} : #{response.body}")
 | 
			
		||||
                end
 | 
			
		||||
              rescue ex
 | 
			
		||||
                logger.puts("#{ucid} : #{ex.message}")
 | 
			
		||||
              end
 | 
			
		||||
 | 
			
		||||
              active_channel.send(true)
 | 
			
		||||
            end
 | 
			
		||||
          end
 | 
			
		||||
        end
 | 
			
		||||
 | 
			
		||||
        sleep 1.minute
 | 
			
		||||
        Fiber.yield
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    max_channel.send(max_threads.as(Int32))
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
 | 
			
		||||
def pull_popular_videos(db)
 | 
			
		||||
  loop do
 | 
			
		||||
    videos = db.query_all("SELECT DISTINCT ON (ucid) * FROM channel_videos WHERE ucid IN \
 | 
			
		||||
      (SELECT channel FROM (SELECT UNNEST(subscriptions) AS channel FROM users) AS d \
 | 
			
		||||
      GROUP BY channel ORDER BY COUNT(channel) DESC LIMIT 40) \
 | 
			
		||||
      ORDER BY ucid, published DESC", as: ChannelVideo).sort_by { |video| video.published }.reverse
 | 
			
		||||
 | 
			
		||||
    yield videos
 | 
			
		||||
 | 
			
		||||
    sleep 1.minute
 | 
			
		||||
    Fiber.yield
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
 | 
			
		||||
def update_decrypt_function
 | 
			
		||||
  loop do
 | 
			
		||||
    begin
 | 
			
		||||
      decrypt_function = fetch_decrypt_function
 | 
			
		||||
      yield decrypt_function
 | 
			
		||||
    rescue ex
 | 
			
		||||
      # TODO: Log error
 | 
			
		||||
      next
 | 
			
		||||
    ensure
 | 
			
		||||
      sleep 1.minute
 | 
			
		||||
      Fiber.yield
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
 | 
			
		||||
def bypass_captcha(captcha_key, logger)
 | 
			
		||||
  loop do
 | 
			
		||||
    begin
 | 
			
		||||
      {"/watch?v=CvFH_6DNRCY&gl=US&hl=en&has_verified=1&bpctr=9999999999", produce_channel_videos_url(ucid: "UCXuqSBlHAE6Xw-yeJA0Tunw")}.each do |path|
 | 
			
		||||
        response = YT_POOL.client &.get(path)
 | 
			
		||||
        if response.body.includes?("To continue with your YouTube experience, please fill out the form below.")
 | 
			
		||||
          html = XML.parse_html(response.body)
 | 
			
		||||
          form = html.xpath_node(%(//form[@action="/das_captcha"])).not_nil!
 | 
			
		||||
          site_key = form.xpath_node(%(.//div[@id="recaptcha"])).try &.["data-sitekey"]
 | 
			
		||||
          s_value = form.xpath_node(%(.//div[@id="recaptcha"])).try &.["data-s"]
 | 
			
		||||
 | 
			
		||||
          inputs = {} of String => String
 | 
			
		||||
          form.xpath_nodes(%(.//input[@name])).map do |node|
 | 
			
		||||
            inputs[node["name"]] = node["value"]
 | 
			
		||||
          end
 | 
			
		||||
 | 
			
		||||
          headers = response.cookies.add_request_headers(HTTP::Headers.new)
 | 
			
		||||
 | 
			
		||||
          response = JSON.parse(HTTP::Client.post("https://api.anti-captcha.com/createTask", body: {
 | 
			
		||||
            "clientKey" => CONFIG.captcha_key,
 | 
			
		||||
            "task"      => {
 | 
			
		||||
              "type"                => "NoCaptchaTaskProxyless",
 | 
			
		||||
              "websiteURL"          => "https://www.youtube.com#{path}",
 | 
			
		||||
              "websiteKey"          => site_key,
 | 
			
		||||
              "recaptchaDataSValue" => s_value,
 | 
			
		||||
            },
 | 
			
		||||
          }.to_json).body)
 | 
			
		||||
 | 
			
		||||
          raise response["error"].as_s if response["error"]?
 | 
			
		||||
          task_id = response["taskId"].as_i
 | 
			
		||||
 | 
			
		||||
          loop do
 | 
			
		||||
            sleep 10.seconds
 | 
			
		||||
 | 
			
		||||
            response = JSON.parse(HTTP::Client.post("https://api.anti-captcha.com/getTaskResult", body: {
 | 
			
		||||
              "clientKey" => CONFIG.captcha_key,
 | 
			
		||||
              "taskId"    => task_id,
 | 
			
		||||
            }.to_json).body)
 | 
			
		||||
 | 
			
		||||
            if response["status"]?.try &.== "ready"
 | 
			
		||||
              break
 | 
			
		||||
            elsif response["errorId"]?.try &.as_i != 0
 | 
			
		||||
              raise response["errorDescription"].as_s
 | 
			
		||||
            end
 | 
			
		||||
          end
 | 
			
		||||
 | 
			
		||||
          inputs["g-recaptcha-response"] = response["solution"]["gRecaptchaResponse"].as_s
 | 
			
		||||
          headers["Cookies"] = response["solution"]["cookies"].as_h?.try &.map { |k, v| "#{k}=#{v}" }.join("; ") || ""
 | 
			
		||||
          response = YT_POOL.client &.post("/das_captcha", headers, form: inputs)
 | 
			
		||||
 | 
			
		||||
          yield response.cookies.select { |cookie| cookie.name != "PREF" }
 | 
			
		||||
        elsif response.headers["Location"]?.try &.includes?("/sorry/index")
 | 
			
		||||
          location = response.headers["Location"].try { |u| URI.parse(u) }
 | 
			
		||||
          headers = HTTP::Headers{":authority" => location.host.not_nil!}
 | 
			
		||||
          response = YT_POOL.client &.get(location.full_path, headers)
 | 
			
		||||
 | 
			
		||||
          html = XML.parse_html(response.body)
 | 
			
		||||
          form = html.xpath_node(%(//form[@action="index"])).not_nil!
 | 
			
		||||
          site_key = form.xpath_node(%(.//div[@id="recaptcha"])).try &.["data-sitekey"]
 | 
			
		||||
          s_value = form.xpath_node(%(.//div[@id="recaptcha"])).try &.["data-s"]
 | 
			
		||||
 | 
			
		||||
          inputs = {} of String => String
 | 
			
		||||
          form.xpath_nodes(%(.//input[@name])).map do |node|
 | 
			
		||||
            inputs[node["name"]] = node["value"]
 | 
			
		||||
          end
 | 
			
		||||
 | 
			
		||||
          captcha_client = HTTPClient.new(URI.parse("https://api.anti-captcha.com"))
 | 
			
		||||
          captcha_client.family = CONFIG.force_resolve || Socket::Family::INET
 | 
			
		||||
          response = JSON.parse(captcha_client.post("/createTask", body: {
 | 
			
		||||
            "clientKey" => CONFIG.captcha_key,
 | 
			
		||||
            "task"      => {
 | 
			
		||||
              "type"                => "NoCaptchaTaskProxyless",
 | 
			
		||||
              "websiteURL"          => location.to_s,
 | 
			
		||||
              "websiteKey"          => site_key,
 | 
			
		||||
              "recaptchaDataSValue" => s_value,
 | 
			
		||||
            },
 | 
			
		||||
          }.to_json).body)
 | 
			
		||||
 | 
			
		||||
          raise response["error"].as_s if response["error"]?
 | 
			
		||||
          task_id = response["taskId"].as_i
 | 
			
		||||
 | 
			
		||||
          loop do
 | 
			
		||||
            sleep 10.seconds
 | 
			
		||||
 | 
			
		||||
            response = JSON.parse(captcha_client.post("/getTaskResult", body: {
 | 
			
		||||
              "clientKey" => CONFIG.captcha_key,
 | 
			
		||||
              "taskId"    => task_id,
 | 
			
		||||
            }.to_json).body)
 | 
			
		||||
 | 
			
		||||
            if response["status"]?.try &.== "ready"
 | 
			
		||||
              break
 | 
			
		||||
            elsif response["errorId"]?.try &.as_i != 0
 | 
			
		||||
              raise response["errorDescription"].as_s
 | 
			
		||||
            end
 | 
			
		||||
          end
 | 
			
		||||
 | 
			
		||||
          inputs["g-recaptcha-response"] = response["solution"]["gRecaptchaResponse"].as_s
 | 
			
		||||
          headers["Cookies"] = response["solution"]["cookies"].as_h?.try &.map { |k, v| "#{k}=#{v}" }.join("; ") || ""
 | 
			
		||||
          response = YT_POOL.client &.post("/sorry/index", headers: headers, form: inputs)
 | 
			
		||||
          headers = HTTP::Headers{
 | 
			
		||||
            "Cookie" => URI.parse(response.headers["location"]).query_params["google_abuse"].split(";")[0],
 | 
			
		||||
          }
 | 
			
		||||
          cookies = HTTP::Cookies.from_headers(headers)
 | 
			
		||||
 | 
			
		||||
          yield cookies
 | 
			
		||||
        end
 | 
			
		||||
      end
 | 
			
		||||
    rescue ex
 | 
			
		||||
      logger.puts("Exception: #{ex.message}")
 | 
			
		||||
    ensure
 | 
			
		||||
      sleep 1.minute
 | 
			
		||||
      Fiber.yield
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
 | 
			
		||||
def find_working_proxies(regions)
 | 
			
		||||
  loop do
 | 
			
		||||
    regions.each do |region|
 | 
			
		||||
      proxies = get_proxies(region).first(20)
 | 
			
		||||
      proxies = proxies.map { |proxy| {ip: proxy[:ip], port: proxy[:port]} }
 | 
			
		||||
      # proxies = filter_proxies(proxies)
 | 
			
		||||
 | 
			
		||||
      yield region, proxies
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    sleep 1.minute
 | 
			
		||||
    Fiber.yield
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
							
								
								
									
										131
									
								
								src/invidious/jobs/bypass_captcha_job.cr
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										131
									
								
								src/invidious/jobs/bypass_captcha_job.cr
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,131 @@
 | 
			
		||||
class Invidious::Jobs::BypassCaptchaJob < Invidious::Jobs::BaseJob
 | 
			
		||||
  private getter logger : Invidious::LogHandler
 | 
			
		||||
  private getter config : Config
 | 
			
		||||
 | 
			
		||||
  def initialize(@logger, @config)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def begin
 | 
			
		||||
    loop do
 | 
			
		||||
      begin
 | 
			
		||||
        {"/watch?v=CvFH_6DNRCY&gl=US&hl=en&has_verified=1&bpctr=9999999999", produce_channel_videos_url(ucid: "UCXuqSBlHAE6Xw-yeJA0Tunw")}.each do |path|
 | 
			
		||||
          response = YT_POOL.client &.get(path)
 | 
			
		||||
          if response.body.includes?("To continue with your YouTube experience, please fill out the form below.")
 | 
			
		||||
            html = XML.parse_html(response.body)
 | 
			
		||||
            form = html.xpath_node(%(//form[@action="/das_captcha"])).not_nil!
 | 
			
		||||
            site_key = form.xpath_node(%(.//div[@id="recaptcha"])).try &.["data-sitekey"]
 | 
			
		||||
            s_value = form.xpath_node(%(.//div[@id="recaptcha"])).try &.["data-s"]
 | 
			
		||||
 | 
			
		||||
            inputs = {} of String => String
 | 
			
		||||
            form.xpath_nodes(%(.//input[@name])).map do |node|
 | 
			
		||||
              inputs[node["name"]] = node["value"]
 | 
			
		||||
            end
 | 
			
		||||
 | 
			
		||||
            headers = response.cookies.add_request_headers(HTTP::Headers.new)
 | 
			
		||||
 | 
			
		||||
            response = JSON.parse(HTTP::Client.post("https://api.anti-captcha.com/createTask", body: {
 | 
			
		||||
              "clientKey" => config.captcha_key,
 | 
			
		||||
              "task"      => {
 | 
			
		||||
                "type"                => "NoCaptchaTaskProxyless",
 | 
			
		||||
                "websiteURL"          => "https://www.youtube.com#{path}",
 | 
			
		||||
                "websiteKey"          => site_key,
 | 
			
		||||
                "recaptchaDataSValue" => s_value,
 | 
			
		||||
              },
 | 
			
		||||
            }.to_json).body)
 | 
			
		||||
 | 
			
		||||
            raise response["error"].as_s if response["error"]?
 | 
			
		||||
            task_id = response["taskId"].as_i
 | 
			
		||||
 | 
			
		||||
            loop do
 | 
			
		||||
              sleep 10.seconds
 | 
			
		||||
 | 
			
		||||
              response = JSON.parse(HTTP::Client.post("https://api.anti-captcha.com/getTaskResult", body: {
 | 
			
		||||
                "clientKey" => config.captcha_key,
 | 
			
		||||
                "taskId"    => task_id,
 | 
			
		||||
              }.to_json).body)
 | 
			
		||||
 | 
			
		||||
              if response["status"]?.try &.== "ready"
 | 
			
		||||
                break
 | 
			
		||||
              elsif response["errorId"]?.try &.as_i != 0
 | 
			
		||||
                raise response["errorDescription"].as_s
 | 
			
		||||
              end
 | 
			
		||||
            end
 | 
			
		||||
 | 
			
		||||
            inputs["g-recaptcha-response"] = response["solution"]["gRecaptchaResponse"].as_s
 | 
			
		||||
            headers["Cookies"] = response["solution"]["cookies"].as_h?.try &.map { |k, v| "#{k}=#{v}" }.join("; ") || ""
 | 
			
		||||
            response = YT_POOL.client &.post("/das_captcha", headers, form: inputs)
 | 
			
		||||
 | 
			
		||||
            response.cookies
 | 
			
		||||
              .select { |cookie| cookie.name != "PREF" }
 | 
			
		||||
              .each { |cookie| config.cookies << cookie }
 | 
			
		||||
 | 
			
		||||
            # Persist cookies between runs
 | 
			
		||||
            File.write("config/config.yml", config.to_yaml)
 | 
			
		||||
          elsif response.headers["Location"]?.try &.includes?("/sorry/index")
 | 
			
		||||
            location = response.headers["Location"].try { |u| URI.parse(u) }
 | 
			
		||||
            headers = HTTP::Headers{":authority" => location.host.not_nil!}
 | 
			
		||||
            response = YT_POOL.client &.get(location.full_path, headers)
 | 
			
		||||
 | 
			
		||||
            html = XML.parse_html(response.body)
 | 
			
		||||
            form = html.xpath_node(%(//form[@action="index"])).not_nil!
 | 
			
		||||
            site_key = form.xpath_node(%(.//div[@id="recaptcha"])).try &.["data-sitekey"]
 | 
			
		||||
            s_value = form.xpath_node(%(.//div[@id="recaptcha"])).try &.["data-s"]
 | 
			
		||||
 | 
			
		||||
            inputs = {} of String => String
 | 
			
		||||
            form.xpath_nodes(%(.//input[@name])).map do |node|
 | 
			
		||||
              inputs[node["name"]] = node["value"]
 | 
			
		||||
            end
 | 
			
		||||
 | 
			
		||||
            captcha_client = HTTPClient.new(URI.parse("https://api.anti-captcha.com"))
 | 
			
		||||
            captcha_client.family = config.force_resolve || Socket::Family::INET
 | 
			
		||||
            response = JSON.parse(captcha_client.post("/createTask", body: {
 | 
			
		||||
              "clientKey" => config.captcha_key,
 | 
			
		||||
              "task"      => {
 | 
			
		||||
                "type"                => "NoCaptchaTaskProxyless",
 | 
			
		||||
                "websiteURL"          => location.to_s,
 | 
			
		||||
                "websiteKey"          => site_key,
 | 
			
		||||
                "recaptchaDataSValue" => s_value,
 | 
			
		||||
              },
 | 
			
		||||
            }.to_json).body)
 | 
			
		||||
 | 
			
		||||
            raise response["error"].as_s if response["error"]?
 | 
			
		||||
            task_id = response["taskId"].as_i
 | 
			
		||||
 | 
			
		||||
            loop do
 | 
			
		||||
              sleep 10.seconds
 | 
			
		||||
 | 
			
		||||
              response = JSON.parse(captcha_client.post("/getTaskResult", body: {
 | 
			
		||||
                "clientKey" => config.captcha_key,
 | 
			
		||||
                "taskId"    => task_id,
 | 
			
		||||
              }.to_json).body)
 | 
			
		||||
 | 
			
		||||
              if response["status"]?.try &.== "ready"
 | 
			
		||||
                break
 | 
			
		||||
              elsif response["errorId"]?.try &.as_i != 0
 | 
			
		||||
                raise response["errorDescription"].as_s
 | 
			
		||||
              end
 | 
			
		||||
            end
 | 
			
		||||
 | 
			
		||||
            inputs["g-recaptcha-response"] = response["solution"]["gRecaptchaResponse"].as_s
 | 
			
		||||
            headers["Cookies"] = response["solution"]["cookies"].as_h?.try &.map { |k, v| "#{k}=#{v}" }.join("; ") || ""
 | 
			
		||||
            response = YT_POOL.client &.post("/sorry/index", headers: headers, form: inputs)
 | 
			
		||||
            headers = HTTP::Headers{
 | 
			
		||||
              "Cookie" => URI.parse(response.headers["location"]).query_params["google_abuse"].split(";")[0],
 | 
			
		||||
            }
 | 
			
		||||
            cookies = HTTP::Cookies.from_headers(headers)
 | 
			
		||||
 | 
			
		||||
            cookies.each { |cookie| config.cookies << cookie }
 | 
			
		||||
 | 
			
		||||
            # Persist cookies between runs
 | 
			
		||||
            File.write("config/config.yml", config.to_yaml)
 | 
			
		||||
          end
 | 
			
		||||
        end
 | 
			
		||||
      rescue ex
 | 
			
		||||
        logger.puts("Exception: #{ex.message}")
 | 
			
		||||
      ensure
 | 
			
		||||
        sleep 1.minute
 | 
			
		||||
        Fiber.yield
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
							
								
								
									
										24
									
								
								src/invidious/jobs/notification_job.cr
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										24
									
								
								src/invidious/jobs/notification_job.cr
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,24 @@
 | 
			
		||||
class Invidious::Jobs::NotificationJob < Invidious::Jobs::BaseJob
 | 
			
		||||
  private getter connection_channel : Channel({Bool, Channel(PQ::Notification)})
 | 
			
		||||
  private getter pg_url : URI
 | 
			
		||||
 | 
			
		||||
  def initialize(@connection_channel, @pg_url)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def begin
 | 
			
		||||
    connections = [] of Channel(PQ::Notification)
 | 
			
		||||
 | 
			
		||||
    PG.connect_listen(pg_url, "notifications") { |event| connections.each(&.send(event)) }
 | 
			
		||||
 | 
			
		||||
    loop do
 | 
			
		||||
      action, connection = connection_channel.receive
 | 
			
		||||
 | 
			
		||||
      case action
 | 
			
		||||
      when true
 | 
			
		||||
        connections << connection
 | 
			
		||||
      when false
 | 
			
		||||
        connections.delete(connection)
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
							
								
								
									
										77
									
								
								src/invidious/jobs/refresh_feeds_job.cr
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										77
									
								
								src/invidious/jobs/refresh_feeds_job.cr
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,77 @@
 | 
			
		||||
class Invidious::Jobs::RefreshFeedsJob < Invidious::Jobs::BaseJob
 | 
			
		||||
  private getter db : DB::Database
 | 
			
		||||
  private getter logger : Invidious::LogHandler
 | 
			
		||||
  private getter config : Config
 | 
			
		||||
 | 
			
		||||
  def initialize(@db, @logger, @config)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def begin
 | 
			
		||||
    max_threads = config.feed_threads
 | 
			
		||||
    active_threads = 0
 | 
			
		||||
    active_channel = Channel(Bool).new
 | 
			
		||||
 | 
			
		||||
    loop do
 | 
			
		||||
      db.query("SELECT email FROM users WHERE feed_needs_update = true OR feed_needs_update IS NULL") do |rs|
 | 
			
		||||
        rs.each do
 | 
			
		||||
          email = rs.read(String)
 | 
			
		||||
          view_name = "subscriptions_#{sha256(email)}"
 | 
			
		||||
 | 
			
		||||
          if active_threads >= max_threads
 | 
			
		||||
            if active_channel.receive
 | 
			
		||||
              active_threads -= 1
 | 
			
		||||
            end
 | 
			
		||||
          end
 | 
			
		||||
 | 
			
		||||
          active_threads += 1
 | 
			
		||||
          spawn do
 | 
			
		||||
            begin
 | 
			
		||||
              # Drop outdated views
 | 
			
		||||
              column_array = get_column_array(db, view_name)
 | 
			
		||||
              ChannelVideo.type_array.each_with_index do |name, i|
 | 
			
		||||
                if name != column_array[i]?
 | 
			
		||||
                  logger.puts("DROP MATERIALIZED VIEW #{view_name}")
 | 
			
		||||
                  db.exec("DROP MATERIALIZED VIEW #{view_name}")
 | 
			
		||||
                  raise "view does not exist"
 | 
			
		||||
                end
 | 
			
		||||
              end
 | 
			
		||||
 | 
			
		||||
              if !db.query_one("SELECT pg_get_viewdef('#{view_name}')", as: String).includes? "WHERE ((cv.ucid = ANY (u.subscriptions))"
 | 
			
		||||
                logger.puts("Materialized view #{view_name} is out-of-date, recreating...")
 | 
			
		||||
                db.exec("DROP MATERIALIZED VIEW #{view_name}")
 | 
			
		||||
              end
 | 
			
		||||
 | 
			
		||||
              db.exec("REFRESH MATERIALIZED VIEW #{view_name}")
 | 
			
		||||
              db.exec("UPDATE users SET feed_needs_update = false WHERE email = $1", email)
 | 
			
		||||
            rescue ex
 | 
			
		||||
              # Rename old views
 | 
			
		||||
              begin
 | 
			
		||||
                legacy_view_name = "subscriptions_#{sha256(email)[0..7]}"
 | 
			
		||||
 | 
			
		||||
                db.exec("SELECT * FROM #{legacy_view_name} LIMIT 0")
 | 
			
		||||
                logger.puts("RENAME MATERIALIZED VIEW #{legacy_view_name}")
 | 
			
		||||
                db.exec("ALTER MATERIALIZED VIEW #{legacy_view_name} RENAME TO #{view_name}")
 | 
			
		||||
              rescue ex
 | 
			
		||||
                begin
 | 
			
		||||
                  # While iterating through, we may have an email stored from a deleted account
 | 
			
		||||
                  if db.query_one?("SELECT true FROM users WHERE email = $1", email, as: Bool)
 | 
			
		||||
                    logger.puts("CREATE #{view_name}")
 | 
			
		||||
                    db.exec("CREATE MATERIALIZED VIEW #{view_name} AS #{MATERIALIZED_VIEW_SQL.call(email)}")
 | 
			
		||||
                    db.exec("UPDATE users SET feed_needs_update = false WHERE email = $1", email)
 | 
			
		||||
                  end
 | 
			
		||||
                rescue ex
 | 
			
		||||
                  logger.puts("REFRESH #{email} : #{ex.message}")
 | 
			
		||||
                end
 | 
			
		||||
              end
 | 
			
		||||
            end
 | 
			
		||||
 | 
			
		||||
            active_channel.send(true)
 | 
			
		||||
          end
 | 
			
		||||
        end
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      sleep 5.seconds
 | 
			
		||||
      Fiber.yield
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
							
								
								
									
										59
									
								
								src/invidious/jobs/statistics_refresh_job.cr
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										59
									
								
								src/invidious/jobs/statistics_refresh_job.cr
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,59 @@
 | 
			
		||||
class Invidious::Jobs::StatisticsRefreshJob < Invidious::Jobs::BaseJob
 | 
			
		||||
  STATISTICS = {
 | 
			
		||||
    "version"  => "2.0",
 | 
			
		||||
    "software" => {
 | 
			
		||||
      "name"    => "invidious",
 | 
			
		||||
      "version" => "",
 | 
			
		||||
      "branch"  => "",
 | 
			
		||||
    },
 | 
			
		||||
    "openRegistrations" => true,
 | 
			
		||||
    "usage"             => {
 | 
			
		||||
      "users" => {
 | 
			
		||||
        "total"          => 0_i64,
 | 
			
		||||
        "activeHalfyear" => 0_i64,
 | 
			
		||||
        "activeMonth"    => 0_i64,
 | 
			
		||||
      },
 | 
			
		||||
    },
 | 
			
		||||
    "metadata" => {
 | 
			
		||||
      "updatedAt"              => Time.utc.to_unix,
 | 
			
		||||
      "lastChannelRefreshedAt" => 0_i64,
 | 
			
		||||
    },
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private getter db : DB::Database
 | 
			
		||||
  private getter config : Config
 | 
			
		||||
 | 
			
		||||
  def initialize(@db, @config, @software_config : Hash(String, String))
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def begin
 | 
			
		||||
    load_initial_stats
 | 
			
		||||
 | 
			
		||||
    loop do
 | 
			
		||||
      refresh_stats
 | 
			
		||||
      sleep 1.minute
 | 
			
		||||
      Fiber.yield
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  # should only be called once at the very beginning
 | 
			
		||||
  private def load_initial_stats
 | 
			
		||||
    STATISTICS["software"] = {
 | 
			
		||||
      "name"    => @software_config["name"],
 | 
			
		||||
      "version" => @software_config["version"],
 | 
			
		||||
      "branch"  => @software_config["branch"],
 | 
			
		||||
    }
 | 
			
		||||
    STATISTICS["openRegistration"] = config.registration_enabled
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  private def refresh_stats
 | 
			
		||||
    users = STATISTICS.dig("usage", "users").as(Hash(String, Int64))
 | 
			
		||||
    users["total"] = db.query_one("SELECT count(*) FROM users", as: Int64)
 | 
			
		||||
    users["activeHalfyear"] = db.query_one("SELECT count(*) FROM users WHERE CURRENT_TIMESTAMP - updated < '6 months'", as: Int64)
 | 
			
		||||
    users["activeMonth"] = db.query_one("SELECT count(*) FROM users WHERE CURRENT_TIMESTAMP - updated < '1 month'", as: Int64)
 | 
			
		||||
    STATISTICS["metadata"] = {
 | 
			
		||||
      "updatedAt"              => Time.utc.to_unix,
 | 
			
		||||
      "lastChannelRefreshedAt" => db.query_one?("SELECT updated FROM channels ORDER BY updated DESC LIMIT 1", as: Time).try &.to_unix || 0_i64,
 | 
			
		||||
    }
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
							
								
								
									
										52
									
								
								src/invidious/jobs/subscribe_to_feeds_job.cr
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										52
									
								
								src/invidious/jobs/subscribe_to_feeds_job.cr
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,52 @@
 | 
			
		||||
class Invidious::Jobs::SubscribeToFeedsJob < Invidious::Jobs::BaseJob
 | 
			
		||||
  private getter db : DB::Database
 | 
			
		||||
  private getter logger : Invidious::LogHandler
 | 
			
		||||
  private getter hmac_key : String
 | 
			
		||||
  private getter config : Config
 | 
			
		||||
 | 
			
		||||
  def initialize(@db, @logger, @config, @hmac_key)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def begin
 | 
			
		||||
    max_threads = 1
 | 
			
		||||
    if config.use_pubsub_feeds.is_a?(Int32)
 | 
			
		||||
      max_threads = config.use_pubsub_feeds.as(Int32)
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    active_threads = 0
 | 
			
		||||
    active_channel = Channel(Bool).new
 | 
			
		||||
 | 
			
		||||
    loop do
 | 
			
		||||
      db.query_all("SELECT id FROM channels WHERE CURRENT_TIMESTAMP - subscribed > interval '4 days' OR subscribed IS NULL") do |rs|
 | 
			
		||||
        rs.each do
 | 
			
		||||
          ucid = rs.read(String)
 | 
			
		||||
 | 
			
		||||
          if active_threads >= max_threads.as(Int32)
 | 
			
		||||
            if active_channel.receive
 | 
			
		||||
              active_threads -= 1
 | 
			
		||||
            end
 | 
			
		||||
          end
 | 
			
		||||
 | 
			
		||||
          active_threads += 1
 | 
			
		||||
 | 
			
		||||
          spawn do
 | 
			
		||||
            begin
 | 
			
		||||
              response = subscribe_pubsub(ucid, hmac_key, config)
 | 
			
		||||
 | 
			
		||||
              if response.status_code >= 400
 | 
			
		||||
                logger.puts("#{ucid} : #{response.body}")
 | 
			
		||||
              end
 | 
			
		||||
            rescue ex
 | 
			
		||||
              logger.puts("#{ucid} : #{ex.message}")
 | 
			
		||||
            end
 | 
			
		||||
 | 
			
		||||
            active_channel.send(true)
 | 
			
		||||
          end
 | 
			
		||||
        end
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      sleep 1.minute
 | 
			
		||||
      Fiber.yield
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
							
								
								
									
										19
									
								
								src/invidious/jobs/update_decrypt_function_job.cr
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										19
									
								
								src/invidious/jobs/update_decrypt_function_job.cr
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,19 @@
 | 
			
		||||
class Invidious::Jobs::UpdateDecryptFunctionJob < Invidious::Jobs::BaseJob
 | 
			
		||||
  DECRYPT_FUNCTION = [] of {SigProc, Int32}
 | 
			
		||||
 | 
			
		||||
  def begin
 | 
			
		||||
    loop do
 | 
			
		||||
      begin
 | 
			
		||||
        decrypt_function = fetch_decrypt_function
 | 
			
		||||
        DECRYPT_FUNCTION.clear
 | 
			
		||||
        decrypt_function.each { |df| DECRYPT_FUNCTION << df }
 | 
			
		||||
      rescue ex
 | 
			
		||||
        # TODO: Log error
 | 
			
		||||
        next
 | 
			
		||||
      ensure
 | 
			
		||||
        sleep 1.minute
 | 
			
		||||
        Fiber.yield
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
		Reference in New Issue
	
	Block a user