mirror of
				https://github.com/iv-org/invidious.git
				synced 2025-11-04 06:31:57 +00:00 
			
		
		
		
	Refactor connection pooling logic
- Remove duplication between standard and companion pool - Raises a wrapped exception on any DB:Error - Don't use a non-pool client when client fails - Ensure that client is always released - Add documentation to various pool methods
This commit is contained in:
		@@ -35,6 +35,7 @@ require "protodec/utils"
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
require "./invidious/database/*"
 | 
					require "./invidious/database/*"
 | 
				
			||||||
require "./invidious/database/migrations/*"
 | 
					require "./invidious/database/migrations/*"
 | 
				
			||||||
 | 
					require "./invidious/connection/*"
 | 
				
			||||||
require "./invidious/http_server/*"
 | 
					require "./invidious/http_server/*"
 | 
				
			||||||
require "./invidious/helpers/*"
 | 
					require "./invidious/helpers/*"
 | 
				
			||||||
require "./invidious/yt_backend/*"
 | 
					require "./invidious/yt_backend/*"
 | 
				
			||||||
@@ -91,17 +92,17 @@ SOFTWARE = {
 | 
				
			|||||||
  "branch"  => "#{CURRENT_BRANCH}",
 | 
					  "branch"  => "#{CURRENT_BRANCH}",
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
YT_POOL = YoutubeConnectionPool.new(YT_URL, max_capacity: CONFIG.pool_size, idle_capacity: CONFIG.idle_pool_size)
 | 
					YT_POOL = Invidious::ConnectionPool::Pool.new(YT_URL, max_capacity: CONFIG.pool_size, idle_capacity: CONFIG.idle_pool_size)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# Image request pool
 | 
					# Image request pool
 | 
				
			||||||
 | 
					
 | 
				
			||||||
GGPHT_POOL = YoutubeConnectionPool.new(
 | 
					GGPHT_POOL = Invidious::ConnectionPool::Pool.new(
 | 
				
			||||||
  URI.parse("https://yt3.ggpht.com"),
 | 
					  URI.parse("https://yt3.ggpht.com"),
 | 
				
			||||||
  max_capacity: CONFIG.pool_size,
 | 
					  max_capacity: CONFIG.pool_size,
 | 
				
			||||||
  idle_capacity: CONFIG.idle_pool_size
 | 
					  idle_capacity: CONFIG.idle_pool_size
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
COMPANION_POOL = CompanionConnectionPool.new(
 | 
					COMPANION_POOL = Invidious::ConnectionPool::CompanionPool.new(
 | 
				
			||||||
  max_capacity: CONFIG.pool_size,
 | 
					  max_capacity: CONFIG.pool_size,
 | 
				
			||||||
  idle_capacity: CONFIG.idle_pool_size
 | 
					  idle_capacity: CONFIG.idle_pool_size
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										116
									
								
								src/invidious/connection/pool.cr
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										116
									
								
								src/invidious/connection/pool.cr
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,116 @@
 | 
				
			|||||||
 | 
					module Invidious::ConnectionPool
 | 
				
			||||||
 | 
					  # The base connection pool that provides the underlying logic that all connection pools are based around
 | 
				
			||||||
 | 
					  #
 | 
				
			||||||
 | 
					  # Uses `DB::Pool` for the pooling logic
 | 
				
			||||||
 | 
					  abstract struct BaseConnectionPool(PoolClient)
 | 
				
			||||||
 | 
					    # Returns the max size of the connection pool
 | 
				
			||||||
 | 
					    getter max_capacity : Int32
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Returns the configured checkout time out
 | 
				
			||||||
 | 
					    getter timeout : Float64
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Creates a connection pool with the provided options
 | 
				
			||||||
 | 
					    def initialize(
 | 
				
			||||||
 | 
					      @max_capacity : Int32 = 5,
 | 
				
			||||||
 | 
					      @idle_capacity : Int32? = nil,
 | 
				
			||||||
 | 
					      @timeout : Float64 = 5.0
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					      @pool = build_pool()
 | 
				
			||||||
 | 
					    end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Returns the idle capacity for the connection pool; if unset this is the same as `max_capacity`.
 | 
				
			||||||
 | 
					    #
 | 
				
			||||||
 | 
					    # This means that when idle capacity is unset the pool will keep all connections around forever, all the
 | 
				
			||||||
 | 
					    # way until it reaches max capacity.
 | 
				
			||||||
 | 
					    def idle_capacity : Int32
 | 
				
			||||||
 | 
					      if (idle = @idle_capacity).nil?
 | 
				
			||||||
 | 
					        return @max_capacity
 | 
				
			||||||
 | 
					      end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      return idle
 | 
				
			||||||
 | 
					    end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Returns the underlying `DB::Pool` object
 | 
				
			||||||
 | 
					    abstract def pool : DB::Pool(PoolClient)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Checks out a client from the pool
 | 
				
			||||||
 | 
					    def client(&)
 | 
				
			||||||
 | 
					      pool.checkout do |http_client|
 | 
				
			||||||
 | 
					        # Proxy needs to be reinstated every time we get a client from the pool
 | 
				
			||||||
 | 
					        http_client.proxy = make_configured_http_proxy_client() if CONFIG.http_proxy
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        response = yield http_client
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        return response
 | 
				
			||||||
 | 
					      rescue ex : DB::Error
 | 
				
			||||||
 | 
					        # Prevent broken client from being checked back into the pool
 | 
				
			||||||
 | 
					        http_client.close
 | 
				
			||||||
 | 
					        raise ConnectionPool::Error.new(ex.message, cause: ex)
 | 
				
			||||||
 | 
					      ensure
 | 
				
			||||||
 | 
					        pool.release(http_client)
 | 
				
			||||||
 | 
					      end
 | 
				
			||||||
 | 
					    rescue ex : DB::PoolTimeout
 | 
				
			||||||
 | 
					      # Failed to checkout a client
 | 
				
			||||||
 | 
					      raise ConnectionPool::Error.new(ex.message, cause: ex)
 | 
				
			||||||
 | 
					    end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Builds a connection pool
 | 
				
			||||||
 | 
					    private abstract def build_pool : DB::Pool(PoolClient)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Creates a `DB::Pool::Options` used for constructing `DB::Pool`
 | 
				
			||||||
 | 
					    private def pool_options : DB::Pool::Options
 | 
				
			||||||
 | 
					      return DB::Pool::Options.new(
 | 
				
			||||||
 | 
					        initial_pool_size: 0,
 | 
				
			||||||
 | 
					        max_pool_size: max_capacity,
 | 
				
			||||||
 | 
					        max_idle_pool_size: idle_capacity,
 | 
				
			||||||
 | 
					        checkout_timeout: timeout
 | 
				
			||||||
 | 
					      )
 | 
				
			||||||
 | 
					    end
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  # A basic connection pool where each client within is set to connect to a single resource
 | 
				
			||||||
 | 
					  struct Pool < BaseConnectionPool(HTTP::Client)
 | 
				
			||||||
 | 
					    # The url each client within the pool will connect to
 | 
				
			||||||
 | 
					    getter url : URI
 | 
				
			||||||
 | 
					    getter pool : DB::Pool(HTTP::Client)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Creates a pool of clients that connects to the given url, with the provided options.
 | 
				
			||||||
 | 
					    def initialize(
 | 
				
			||||||
 | 
					      url : URI,
 | 
				
			||||||
 | 
					      *,
 | 
				
			||||||
 | 
					      @max_capacity : Int32 = 5,
 | 
				
			||||||
 | 
					      @idle_capacity : Int32? = nil,
 | 
				
			||||||
 | 
					      @timeout : Float64 = 5.0
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      @url = url
 | 
				
			||||||
 | 
					      @pool = build_pool()
 | 
				
			||||||
 | 
					    end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # :inherit:
 | 
				
			||||||
 | 
					    private def build_pool : DB::Pool(HTTP::Client)
 | 
				
			||||||
 | 
					      return DB::Pool(HTTP::Client).new(pool_options) do
 | 
				
			||||||
 | 
					        make_client(url, force_resolve: true)
 | 
				
			||||||
 | 
					      end
 | 
				
			||||||
 | 
					    end
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  # A modified connection pool for the interacting with Invidious companion.
 | 
				
			||||||
 | 
					  #
 | 
				
			||||||
 | 
					  # The main difference is that clients in this pool are created with different urls
 | 
				
			||||||
 | 
					  # based on what is randomly selected from the configured list of companions
 | 
				
			||||||
 | 
					  struct CompanionPool < BaseConnectionPool(HTTP::Client)
 | 
				
			||||||
 | 
					    getter pool : DB::Pool(HTTP::Client)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # :inherit:
 | 
				
			||||||
 | 
					    private def build_pool : DB::Pool(HTTP::Client)
 | 
				
			||||||
 | 
					      return DB::Pool(HTTP::Client).new(pool_options) do
 | 
				
			||||||
 | 
					        companion = CONFIG.invidious_companion.sample
 | 
				
			||||||
 | 
					        make_client(companion.private_url, use_http_proxy: false)
 | 
				
			||||||
 | 
					      end
 | 
				
			||||||
 | 
					    end
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  class Error < Exception
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					end
 | 
				
			||||||
@@ -1,111 +1,6 @@
 | 
				
			|||||||
# Mapping of subdomain => YoutubeConnectionPool
 | 
					# Mapping of subdomain => Invidious::ConnectionPool::Pool
 | 
				
			||||||
# This is needed as we may need to access arbitrary subdomains of ytimg
 | 
					# This is needed as we may need to access arbitrary subdomains of ytimg
 | 
				
			||||||
private YTIMG_POOLS = {} of String => YoutubeConnectionPool
 | 
					private YTIMG_POOLS = {} of String => Invidious::ConnectionPool::Pool
 | 
				
			||||||
 | 
					 | 
				
			||||||
struct YoutubeConnectionPool
 | 
					 | 
				
			||||||
  property! url : URI
 | 
					 | 
				
			||||||
  property! max_capacity : Int32
 | 
					 | 
				
			||||||
  property! idle_capacity : Int32
 | 
					 | 
				
			||||||
  property! timeout : Float64
 | 
					 | 
				
			||||||
  property pool : DB::Pool(HTTP::Client)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  def initialize(
 | 
					 | 
				
			||||||
    url : URI,
 | 
					 | 
				
			||||||
    *,
 | 
					 | 
				
			||||||
    @max_capacity : Int32 = 5,
 | 
					 | 
				
			||||||
    idle_capacity : Int32? = nil,
 | 
					 | 
				
			||||||
    @timeout : Float64 = 5.0,
 | 
					 | 
				
			||||||
  )
 | 
					 | 
				
			||||||
    if idle_capacity.nil?
 | 
					 | 
				
			||||||
      @idle_capacity = @max_capacity
 | 
					 | 
				
			||||||
    else
 | 
					 | 
				
			||||||
      @idle_capacity = idle_capacity
 | 
					 | 
				
			||||||
    end
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    @url = url
 | 
					 | 
				
			||||||
    @pool = build_pool()
 | 
					 | 
				
			||||||
  end
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  def client(&)
 | 
					 | 
				
			||||||
    conn = pool.checkout
 | 
					 | 
				
			||||||
    # Proxy needs to be reinstated every time we get a client from the pool
 | 
					 | 
				
			||||||
    conn.proxy = make_configured_http_proxy_client() if CONFIG.http_proxy
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    begin
 | 
					 | 
				
			||||||
      response = yield conn
 | 
					 | 
				
			||||||
    rescue ex
 | 
					 | 
				
			||||||
      conn.close
 | 
					 | 
				
			||||||
      conn = make_client(url, force_resolve: true)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
      response = yield conn
 | 
					 | 
				
			||||||
    ensure
 | 
					 | 
				
			||||||
      pool.release(conn)
 | 
					 | 
				
			||||||
    end
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    response
 | 
					 | 
				
			||||||
  end
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  private def build_pool
 | 
					 | 
				
			||||||
    # We call the getter for the instance variables instead of using them directly
 | 
					 | 
				
			||||||
    # because the getters defined by property! ensures that the value is not a nil
 | 
					 | 
				
			||||||
    options = DB::Pool::Options.new(
 | 
					 | 
				
			||||||
      initial_pool_size: 0,
 | 
					 | 
				
			||||||
      max_pool_size: max_capacity,
 | 
					 | 
				
			||||||
      max_idle_pool_size: idle_capacity,
 | 
					 | 
				
			||||||
      checkout_timeout: timeout
 | 
					 | 
				
			||||||
    )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    DB::Pool(HTTP::Client).new(options) do
 | 
					 | 
				
			||||||
      next make_client(url, force_resolve: true)
 | 
					 | 
				
			||||||
    end
 | 
					 | 
				
			||||||
  end
 | 
					 | 
				
			||||||
end
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
struct CompanionConnectionPool
 | 
					 | 
				
			||||||
  property! max_capacity : Int32
 | 
					 | 
				
			||||||
  property! idle_capacity : Int32
 | 
					 | 
				
			||||||
  property! timeout : Float64
 | 
					 | 
				
			||||||
  property pool : DB::Pool(HTTP::Client)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  def initialize(*, @max_capacity : Int32 = 5, idle_capacity : Int32? = nil, @timeout : Float64 = 5.0)
 | 
					 | 
				
			||||||
    if idle_capacity.nil?
 | 
					 | 
				
			||||||
      @idle_capacity = @max_capacity
 | 
					 | 
				
			||||||
    else
 | 
					 | 
				
			||||||
      @idle_capacity = idle_capacity
 | 
					 | 
				
			||||||
    end
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    options = DB::Pool::Options.new(
 | 
					 | 
				
			||||||
      initial_pool_size: 0,
 | 
					 | 
				
			||||||
      max_pool_size: max_capacity,
 | 
					 | 
				
			||||||
      max_idle_pool_size: idle_capacity.not_nil!,
 | 
					 | 
				
			||||||
      checkout_timeout: timeout
 | 
					 | 
				
			||||||
    )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    @pool = DB::Pool(HTTP::Client).new(options) do
 | 
					 | 
				
			||||||
      companion = CONFIG.invidious_companion.sample
 | 
					 | 
				
			||||||
      next make_client(companion.private_url, use_http_proxy: false)
 | 
					 | 
				
			||||||
    end
 | 
					 | 
				
			||||||
  end
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  def client(&)
 | 
					 | 
				
			||||||
    conn = pool.checkout
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    begin
 | 
					 | 
				
			||||||
      response = yield conn
 | 
					 | 
				
			||||||
    rescue ex
 | 
					 | 
				
			||||||
      conn.close
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
      companion = CONFIG.invidious_companion.sample
 | 
					 | 
				
			||||||
      conn = make_client(companion.private_url, use_http_proxy: false)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
      response = yield conn
 | 
					 | 
				
			||||||
    ensure
 | 
					 | 
				
			||||||
      pool.release(conn)
 | 
					 | 
				
			||||||
    end
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    response
 | 
					 | 
				
			||||||
  end
 | 
					 | 
				
			||||||
end
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
def add_yt_headers(request)
 | 
					def add_yt_headers(request)
 | 
				
			||||||
  request.headers.delete("User-Agent") if request.headers["User-Agent"] == "Crystal"
 | 
					  request.headers.delete("User-Agent") if request.headers["User-Agent"] == "Crystal"
 | 
				
			||||||
@@ -169,7 +64,7 @@ def get_ytimg_pool(subdomain)
 | 
				
			|||||||
    return pool
 | 
					    return pool
 | 
				
			||||||
  else
 | 
					  else
 | 
				
			||||||
    LOGGER.info("ytimg_pool: Creating a new HTTP pool for \"https://#{subdomain}.ytimg.com\"")
 | 
					    LOGGER.info("ytimg_pool: Creating a new HTTP pool for \"https://#{subdomain}.ytimg.com\"")
 | 
				
			||||||
    pool = YoutubeConnectionPool.new(
 | 
					    pool = Invidious::ConnectionPool::Pool.new(
 | 
				
			||||||
      URI.parse("https://#{subdomain}.ytimg.com"),
 | 
					      URI.parse("https://#{subdomain}.ytimg.com"),
 | 
				
			||||||
      max_capacity: CONFIG.pool_size,
 | 
					      max_capacity: CONFIG.pool_size,
 | 
				
			||||||
      idle_capacity: CONFIG.idle_pool_size
 | 
					      idle_capacity: CONFIG.idle_pool_size
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user