mirror of
				https://github.com/iv-org/invidious.git
				synced 2025-10-31 04:32:02 +00:00 
			
		
		
		
	Refactor connect_listen for notifications
This commit is contained in:
		| @@ -186,6 +186,13 @@ spawn do | ||||
|   end | ||||
| end | ||||
|  | ||||
| notification_channels = [] of Channel(PQ::Notification) | ||||
| PG.connect_listen(PG_URL, "notifications") do |event| | ||||
|   notification_channels.each do |channel| | ||||
|     channel.send(event) | ||||
|   end | ||||
| end | ||||
|  | ||||
| proxies = PROXY_LIST | ||||
|  | ||||
| before_all do |env| | ||||
| @@ -4457,17 +4464,37 @@ get "/api/v1/mixes/:rdid" do |env| | ||||
| end | ||||
|  | ||||
| get "/api/v1/auth/notifications" do |env| | ||||
|   env.response.content_type = "text/event-stream" | ||||
|  | ||||
|   topics = env.params.query["topics"]?.try &.split(",").uniq.first(1000) | ||||
|   topics ||= [] of String | ||||
|  | ||||
|   create_notification_stream(env, proxies, config, Kemal.config, decrypt_function, topics) | ||||
|   notification_channel = Channel(PQ::Notification).new | ||||
|   notification_channels << notification_channel | ||||
|  | ||||
|   begin | ||||
|     create_notification_stream(env, proxies, config, Kemal.config, decrypt_function, topics, notification_channel) | ||||
|   rescue ex | ||||
|   ensure | ||||
|     notification_channels.delete(notification_channel) | ||||
|   end | ||||
| end | ||||
|  | ||||
| post "/api/v1/auth/notifications" do |env| | ||||
|   env.response.content_type = "text/event-stream" | ||||
|  | ||||
|   topics = env.params.body["topics"]?.try &.split(",").uniq.first(1000) | ||||
|   topics ||= [] of String | ||||
|  | ||||
|   create_notification_stream(env, proxies, config, Kemal.config, decrypt_function, topics) | ||||
|   notification_channel = Channel(PQ::Notification).new | ||||
|   notification_channels << notification_channel | ||||
|  | ||||
|   begin | ||||
|     create_notification_stream(env, proxies, config, Kemal.config, decrypt_function, topics, notification_channel) | ||||
|   rescue ex | ||||
|   ensure | ||||
|     notification_channels.delete(notification_channel) | ||||
|   end | ||||
| end | ||||
|  | ||||
| get "/api/v1/auth/preferences" do |env| | ||||
|   | ||||
| @@ -661,89 +661,23 @@ def copy_in_chunks(input, output, chunk_size = 4096) | ||||
|   end | ||||
| end | ||||
|  | ||||
| def create_notification_stream(env, proxies, config, kemal_config, decrypt_function, topics) | ||||
| def create_notification_stream(env, proxies, config, kemal_config, decrypt_function, topics, notification_channel) | ||||
|   locale = LOCALES[env.get("preferences").as(Preferences).locale]? | ||||
|  | ||||
|   env.response.content_type = "text/event-stream" | ||||
|  | ||||
|   since = env.params.query["since"]?.try &.to_i? | ||||
|   id = 0 | ||||
|  | ||||
|   begin | ||||
|     id = 0 | ||||
|  | ||||
|     if topics.includes? "debug" | ||||
|       spawn do | ||||
|         loop do | ||||
|           time_span = [0, 0, 0, 0] | ||||
|           time_span[rand(4)] = rand(30) + 5 | ||||
|           published = Time.now - Time::Span.new(time_span[0], time_span[1], time_span[2], time_span[3]) | ||||
|           video_id = TEST_IDS[rand(TEST_IDS.size)] | ||||
|  | ||||
|           video = get_video(video_id, PG_DB, proxies) | ||||
|           video.published = published | ||||
|           response = JSON.parse(video.to_json(locale, config, kemal_config, decrypt_function)) | ||||
|  | ||||
|           if fields_text = env.params.query["fields"]? | ||||
|             begin | ||||
|               JSONFilter.filter(response, fields_text) | ||||
|             rescue ex | ||||
|               env.response.status_code = 400 | ||||
|               response = {"error" => ex.message} | ||||
|             end | ||||
|           end | ||||
|  | ||||
|           env.response.puts "id: #{id}" | ||||
|           env.response.puts "data: #{response.to_json}" | ||||
|           env.response.puts | ||||
|           env.response.flush | ||||
|  | ||||
|           id += 1 | ||||
|  | ||||
|           sleep 1.minute | ||||
|         end | ||||
|       end | ||||
|     end | ||||
|  | ||||
|   if topics.includes? "debug" | ||||
|     spawn do | ||||
|       if since | ||||
|         topics.try &.each do |topic| | ||||
|           case topic | ||||
|           when .match(/UC[A-Za-z0-9_-]{22}/) | ||||
|             PG_DB.query_all("SELECT * FROM channel_videos WHERE ucid = $1 AND published > $2 ORDER BY published DESC LIMIT 15", | ||||
|               topic, Time.unix(since.not_nil!), as: ChannelVideo).each do |video| | ||||
|               response = JSON.parse(video.to_json(locale, config, Kemal.config)) | ||||
|  | ||||
|               if fields_text = env.params.query["fields"]? | ||||
|                 begin | ||||
|                   JSONFilter.filter(response, fields_text) | ||||
|                 rescue ex | ||||
|                   env.response.status_code = 400 | ||||
|                   response = {"error" => ex.message} | ||||
|                 end | ||||
|               end | ||||
|  | ||||
|               env.response.puts "id: #{id}" | ||||
|               env.response.puts "data: #{response.to_json}" | ||||
|               env.response.puts | ||||
|               env.response.flush | ||||
|  | ||||
|               id += 1 | ||||
|             end | ||||
|           else | ||||
|             # TODO | ||||
|           end | ||||
|         end | ||||
|       end | ||||
|  | ||||
|       PG.connect_listen(PG_URL, "notifications") do |event| | ||||
|         notification = JSON.parse(event.payload) | ||||
|         topic = notification["topic"].as_s | ||||
|         video_id = notification["videoId"].as_s | ||||
|         published = notification["published"].as_i64 | ||||
|       loop do | ||||
|         time_span = [0, 0, 0, 0] | ||||
|         time_span[rand(4)] = rand(30) + 5 | ||||
|         published = Time.now - Time::Span.new(time_span[0], time_span[1], time_span[2], time_span[3]) | ||||
|         video_id = TEST_IDS[rand(TEST_IDS.size)] | ||||
|  | ||||
|         video = get_video(video_id, PG_DB, proxies) | ||||
|         video.published = Time.unix(published) | ||||
|         response = JSON.parse(video.to_json(locale, config, Kemal.config, decrypt_function)) | ||||
|         video.published = published | ||||
|         response = JSON.parse(video.to_json(locale, config, kemal_config, decrypt_function)) | ||||
|  | ||||
|         if fields_text = env.params.query["fields"]? | ||||
|           begin | ||||
| @@ -754,24 +688,88 @@ def create_notification_stream(env, proxies, config, kemal_config, decrypt_funct | ||||
|           end | ||||
|         end | ||||
|  | ||||
|         if topics.try &.includes? topic | ||||
|           env.response.puts "id: #{id}" | ||||
|           env.response.puts "data: #{response.to_json}" | ||||
|           env.response.puts | ||||
|           env.response.flush | ||||
|         env.response.puts "id: #{id}" | ||||
|         env.response.puts "data: #{response.to_json}" | ||||
|         env.response.puts | ||||
|         env.response.flush | ||||
|  | ||||
|           id += 1 | ||||
|         id += 1 | ||||
|  | ||||
|         sleep 1.minute | ||||
|       end | ||||
|     end | ||||
|   end | ||||
|  | ||||
|   spawn do | ||||
|     if since | ||||
|       topics.try &.each do |topic| | ||||
|         case topic | ||||
|         when .match(/UC[A-Za-z0-9_-]{22}/) | ||||
|           PG_DB.query_all("SELECT * FROM channel_videos WHERE ucid = $1 AND published > $2 ORDER BY published DESC LIMIT 15", | ||||
|             topic, Time.unix(since.not_nil!), as: ChannelVideo).each do |video| | ||||
|             response = JSON.parse(video.to_json(locale, config, Kemal.config)) | ||||
|  | ||||
|             if fields_text = env.params.query["fields"]? | ||||
|               begin | ||||
|                 JSONFilter.filter(response, fields_text) | ||||
|               rescue ex | ||||
|                 env.response.status_code = 400 | ||||
|                 response = {"error" => ex.message} | ||||
|               end | ||||
|             end | ||||
|  | ||||
|             env.response.puts "id: #{id}" | ||||
|             env.response.puts "data: #{response.to_json}" | ||||
|             env.response.puts | ||||
|             env.response.flush | ||||
|  | ||||
|             id += 1 | ||||
|           end | ||||
|         else | ||||
|           # TODO | ||||
|         end | ||||
|       end | ||||
|     end | ||||
|   end | ||||
|  | ||||
|     # Send heartbeat | ||||
|   spawn do | ||||
|     loop do | ||||
|       env.response.puts ":keepalive #{Time.now.to_unix}" | ||||
|       env.response.puts | ||||
|       env.response.flush | ||||
|       sleep (20 + rand(11)).seconds | ||||
|       event = notification_channel.receive | ||||
|  | ||||
|       notification = JSON.parse(event.payload) | ||||
|       topic = notification["topic"].as_s | ||||
|       video_id = notification["videoId"].as_s | ||||
|       published = notification["published"].as_i64 | ||||
|  | ||||
|       video = get_video(video_id, PG_DB, proxies) | ||||
|       video.published = Time.unix(published) | ||||
|       response = JSON.parse(video.to_json(locale, config, Kemal.config, decrypt_function)) | ||||
|  | ||||
|       if fields_text = env.params.query["fields"]? | ||||
|         begin | ||||
|           JSONFilter.filter(response, fields_text) | ||||
|         rescue ex | ||||
|           env.response.status_code = 400 | ||||
|           response = {"error" => ex.message} | ||||
|         end | ||||
|       end | ||||
|  | ||||
|       if topics.try &.includes? topic | ||||
|         env.response.puts "id: #{id}" | ||||
|         env.response.puts "data: #{response.to_json}" | ||||
|         env.response.puts | ||||
|         env.response.flush | ||||
|  | ||||
|         id += 1 | ||||
|       end | ||||
|     end | ||||
|   rescue | ||||
|   end | ||||
|  | ||||
|   # Send heartbeat | ||||
|   loop do | ||||
|     env.response.puts ":keepalive #{Time.now.to_unix}" | ||||
|     env.response.puts | ||||
|     env.response.flush | ||||
|     sleep (20 + rand(11)).seconds | ||||
|   end | ||||
| end | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Omar Roth
					Omar Roth