Add triggers for updating feeds

This commit is contained in:
Omar Roth 2019-05-26 11:28:54 -05:00
parent 890d485bb5
commit b016a60a75
No known key found for this signature in database
GPG Key ID: B8254FB7EC3D37F2
3 changed files with 65 additions and 13 deletions

View File

@ -2845,16 +2845,25 @@ post "/feed/webhook/:token" do |env|
premiere_timestamp: video.premiere_timestamp, premiere_timestamp: video.premiere_timestamp,
) )
PG_DB.exec("UPDATE users SET notifications = notifications || $1 \ users = PG_DB.query_all("UPDATE users SET notifications = notifications || $1 \
WHERE updated < $2 AND $3 = ANY(subscriptions) AND $1 <> ALL(notifications)", video.id, video.published, video.ucid) WHERE updated < $2 AND $3 = ANY(subscriptions) AND $1 <> ALL(notifications) RETURNING email",
video.id, video.published, video.ucid, as: String)
video_array = video.to_a video_array = video.to_a
args = arg_array(video_array) args = arg_array(video_array)
PG_DB.exec("INSERT INTO channel_videos VALUES (#{args}) \ PG_DB.exec("INSERT INTO channel_videos VALUES (#{args}) \
ON CONFLICT (id) DO UPDATE SET title = $2, published = $3, \ ON CONFLICT (id) DO UPDATE SET title = $2, published = $3, \
updated = $4, ucid = $5, author = $6, length_seconds = $7, \ updated = $4, ucid = $5, author = $6, length_seconds = $7, \
live_now = $8, premiere_timestamp = $9", video_array) live_now = $8, premiere_timestamp = $9", video_array)
users.each do |user|
payload = {
"email" => user,
"action" => "refresh",
}.to_json
PG_DB.exec("NOTIFY feeds, E'#{payload}'")
end
end end
end end

View File

@ -178,18 +178,27 @@ def fetch_channel(ucid, db, pull_all_videos = true, locale = nil)
premiere_timestamp: premiere_timestamp premiere_timestamp: premiere_timestamp
) )
db.exec("UPDATE users SET notifications = notifications || $1 \ users = db.query_all("UPDATE users SET notifications = notifications || $1 \
WHERE updated < $2 AND $3 = ANY(subscriptions) AND $1 <> ALL(notifications)", video.id, video.published, ucid) WHERE updated < $2 AND $3 = ANY(subscriptions) AND $1 <> ALL(notifications) RETURNING email",
video.id, video.published, ucid, as: String)
video_array = video.to_a video_array = video.to_a
args = arg_array(video_array) args = arg_array(video_array)
# We don't include the 'premire_timestamp' here because channel pages don't include them, # We don't include the 'premiere_timestamp' here because channel pages don't include them,
# meaning the above timestamp is always null # meaning the above timestamp is always null
db.exec("INSERT INTO channel_videos VALUES (#{args}) \ db.exec("INSERT INTO channel_videos VALUES (#{args}) \
ON CONFLICT (id) DO UPDATE SET title = $2, published = $3, \ ON CONFLICT (id) DO UPDATE SET title = $2, published = $3, \
updated = $4, ucid = $5, author = $6, length_seconds = $7, \ updated = $4, ucid = $5, author = $6, length_seconds = $7, \
live_now = $8", video_array) live_now = $8", video_array)
users.each do |user|
payload = {
"email" => user,
"action" => "refresh",
}.to_json
PG_DB.exec("NOTIFY feeds, E'#{payload}'")
end
end end
if pull_all_videos if pull_all_videos
@ -233,18 +242,29 @@ def fetch_channel(ucid, db, pull_all_videos = true, locale = nil)
videos.each do |video| videos.each do |video|
ids << video.id ids << video.id
# FIXME: Red videos don't provide published date, so the best we can do is ignore them # We are notified of Red videos elsewhere (PubSub), which includes a correct published date,
# so since they don't provide a published date here we can safely ignore them.
if Time.now - video.published > 1.minute if Time.now - video.published > 1.minute
db.exec("UPDATE users SET notifications = notifications || $1 \ users = db.query_all("UPDATE users SET notifications = notifications || $1 \
WHERE updated < $2 AND $3 = ANY(subscriptions) AND $1 <> ALL(notifications)", video.id, video.published, video.ucid) WHERE updated < $2 AND $3 = ANY(subscriptions) AND $1 <> ALL(notifications) RETURNING email",
video.id, video.published, video.ucid, as: String)
video_array = video.to_a video_array = video.to_a
args = arg_array(video_array) args = arg_array(video_array)
# We don't update the 'premire_timestamp' here because channel pages don't include them # We don't update the 'premire_timestamp' here because channel pages don't include them
db.exec("INSERT INTO channel_videos VALUES (#{args}) \ db.exec("INSERT INTO channel_videos VALUES (#{args}) \
ON CONFLICT (id) DO UPDATE SET title = $2, updated = $4, \ ON CONFLICT (id) DO UPDATE SET title = $2, updated = $4, \
ucid = $5, author = $6, length_seconds = $7, live_now = $8", video_array) ucid = $5, author = $6, length_seconds = $7, live_now = $8", video_array)
# Update all users affected by insert
users.each do |user|
payload = {
"email" => user,
"action" => "refresh",
}.to_json
PG_DB.exec("NOTIFY feeds, E'#{payload}'")
end
end end
end end

View File

@ -45,6 +45,29 @@ end
def refresh_feeds(db, logger, max_threads = 1) def refresh_feeds(db, logger, max_threads = 1)
max_channel = Channel(Int32).new max_channel = Channel(Int32).new
# TODO: Make this config option, similar to use_pubsub
# Spawn thread to handle feed events
if max_threads > 0
spawn do
PG.connect_listen(PG_URL, "feeds") do |event|
spawn do
feed = JSON.parse(event.payload)
email = feed["email"].as_s
action = feed["action"].as_s
view_name = "subscriptions_#{sha256(email)}"
case action
when "refresh"
db.exec("REFRESH MATERIALIZED VIEW #{view_name}")
end
end
Fiber.yield
end
end
end
spawn do spawn do
max_threads = max_channel.receive max_threads = max_channel.receive
active_threads = 0 active_threads = 0