diff --git a/apps/labrinth/src/routes/internal/external_notifications.rs b/apps/labrinth/src/routes/internal/external_notifications.rs index 942af86166..e62359c3f0 100644 --- a/apps/labrinth/src/routes/internal/external_notifications.rs +++ b/apps/labrinth/src/routes/internal/external_notifications.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use crate::auth::get_user_from_headers; use crate::database::PgPool; use crate::database::models::ids::{DBNotificationId, DBUserId}; @@ -5,26 +7,26 @@ use crate::database::models::notification_item::DBNotification; use crate::database::models::notification_item::NotificationBuilder; use crate::database::models::user_item::DBUser; use crate::database::redis::RedisPool; +use crate::models::notifications::NotificationDeliveryStatus; use crate::models::users::Role; -use crate::models::v3::notifications::{ - Notification, NotificationBody, NotificationDeliveryStatus, -}; +use crate::models::v3::notifications::{Notification, NotificationBody}; use crate::models::v3::pats::Scopes; use crate::queue::email::EmailQueue; use crate::queue::session::AuthQueue; use crate::routes::ApiError; +use crate::routes::internal::external_notifications::EmailFailure::{ + FailedToSend, MailboxNotFound, UserNotFound, +}; use crate::routes::internal::statuses::broadcast_friends_message; use crate::sync::friends::RedisFriendsMessage; use crate::util::guards::external_notification_key_guard; use actix_web::http::StatusCode; use actix_web::web; -use actix_web::{ - CustomizeResponder, HttpRequest, HttpResponse, Responder, delete, post, -}; +use actix_web::{HttpRequest, HttpResponse, delete, post}; use ariadne::ids::UserId; use eyre::eyre; use lettre::message::Mailbox; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; pub fn config(cfg: &mut web::ServiceConfig) { cfg.service(create) @@ -33,66 +35,72 @@ pub fn config(cfg: &mut web::ServiceConfig) { .service(send_custom_email); } +#[derive(Deserialize, PartialEq, Default)] +enum EmailStrategy { + #[default] + Async, + Sync, + None, +} + #[derive(Deserialize)] struct CreateNotification { pub body: NotificationBody, pub user_ids: Vec, + #[serde(default)] + pub email: EmailStrategy, +} + +#[derive(thiserror::Error, Debug, Serialize)] +#[serde(tag = "type", content = "data")] +enum EmailFailure { + #[error("user not found")] + UserNotFound, + #[error("mailbox not found")] + MailboxNotFound, + #[error("failed to send: {0:?}")] + FailedToSend(NotificationDeliveryStatus), + #[error("api error: {0}")] + ApiError( + #[serde(serialize_with = "serialize_api_error")] + #[from] + crate::routes::ApiError, + ), +} + +fn serialize_api_error( + error: &ApiError, + serializer: S, +) -> Result +where + S: serde::Serializer, +{ + error.as_api_error().serialize(serializer) } #[post("external_notifications", guard = "external_notification_key_guard")] pub async fn create( pool: web::Data, redis: web::Data, + email_queue: web::Data, create_notification: web::Json, -) -> Result { - let CreateNotification { body, user_ids } = - create_notification.into_inner(); - let user_ids = user_ids - .into_iter() - .map(|x| DBUserId(x.0 as i64)) - .collect::>(); - - let mut txn = pool.begin().await?; - - if !DBUser::exists_many(&user_ids, &mut txn).await? { - return Err(ApiError::InvalidInput( - "One of the specified users do not exist.".to_owned(), - )); - } - - let notification_ids = NotificationBuilder { body } - .insert_many(user_ids, &mut txn, &redis) - .await?; - - let notifications = - get_site_exposed_notifications(¬ification_ids, &mut txn).await?; - - txn.commit().await?; - - broadcast_notifications(&redis, notifications).await; - - Ok(HttpResponse::Accepted().finish()) +) -> Result<(web::Json>, StatusCode), ApiError> { + create_impl(pool, redis, email_queue, create_notification.into_inner()) + .await } -/// Inserts notifications for all users and tries to send emails immediately. -/// -/// Responds with the user IDs that could not be emailed: -/// - `200` if every recipient was emailed (empty list) -/// - `207` if some recipients could not be emailed (list of failed IDs) -#[post( - "external_notifications/email-sync", - guard = "external_notification_key_guard" -)] -pub async fn create_email_sync( +async fn create_impl( pool: web::Data, redis: web::Data, email_queue: web::Data, - create_notification: web::Json, -) -> Result>>, ApiError> { - let CreateNotification { body, user_ids } = - create_notification.into_inner(); + data: CreateNotification, +) -> Result<(web::Json>, StatusCode), ApiError> { + let CreateNotification { + body, + user_ids, + email: email_strategy, + } = data; let raw_user_ids = user_ids.iter().map(|x| x.0 as i64).collect::>(); - let user_ids = raw_user_ids .iter() .map(|x| DBUserId(*x)) @@ -129,9 +137,21 @@ pub async fn create_email_sync( .filter(|id| !already_notified.contains(id)) .collect::>(); - let notification_ids = NotificationBuilder { body: body.clone() } - .insert_many_without_delivery(notification_user_ids, &mut txn, &redis) - .await?; + let notification_builder = NotificationBuilder { body: body.clone() }; + + let notification_ids = if email_strategy == EmailStrategy::Async { + notification_builder + .insert_many(notification_user_ids, &mut txn, &redis) + .await? + } else { + notification_builder + .insert_many_without_delivery( + notification_user_ids, + &mut txn, + &redis, + ) + .await? + }; let notifications = get_site_exposed_notifications(¬ification_ids, &mut txn).await?; @@ -140,42 +160,98 @@ pub async fn create_email_sync( broadcast_notifications(&redis, notifications).await; - let mut email_txn = pool.begin().await?; + if email_strategy == EmailStrategy::Sync { + let mut email_txn = pool.begin().await?; + + let mut failed = HashMap::new(); + let users = DBUser::get_many_ids(&user_ids, &mut email_txn, &redis) + .await? + .into_iter() + .map(|user| (user.id, user)) + .collect::>(); + + for db_user_id in &user_ids { + let user_id = UserId(db_user_id.0 as u64); + let Some(user) = users.get(db_user_id) else { + failed.insert(user_id, UserNotFound); + continue; + }; + + let Some(mailbox) = user + .email + .as_ref() + .and_then(|email| email.parse::().ok()) + else { + failed.insert(user_id, MailboxNotFound); + continue; + }; + + match email_queue + .send_one(&mut email_txn, body.clone(), *db_user_id, mailbox) + .await + { + Ok(status) => { + if status != NotificationDeliveryStatus::Delivered { + failed.insert(user_id, FailedToSend(status)); + } + } + Err(error) => { + if matches!( + error, + ApiError::SqlxDatabase(_) | ApiError::Database(_) + ) { + return Err(error); + }; + failed.insert(user_id, error.into()); + } + }; + } - let mut failed = Vec::new(); - for user_id in &user_ids { - let Some(user) = - DBUser::get_id(*user_id, &mut email_txn, &redis).await? - else { - failed.push(UserId(user_id.0 as u64)); - continue; - }; + email_txn.commit().await?; - let delivered = match user - .email - .and_then(|email| email.parse::().ok()) + let status = if failed + .values() + .any(|x| matches!(x, EmailFailure::ApiError(_))) { - Some(mailbox) => { - email_queue - .send_one(&mut email_txn, body.clone(), *user_id, mailbox) - .await? - == NotificationDeliveryStatus::Delivered - } - None => false, + StatusCode::INTERNAL_SERVER_ERROR + } else { + StatusCode::OK }; - if !delivered { - failed.push(UserId(user_id.0 as u64)); - } + return Ok((web::Json(failed), status)); } - let status = if failed.is_empty() { - StatusCode::OK - } else { - StatusCode::MULTI_STATUS - }; + Ok((web::Json(HashMap::new()), StatusCode::ACCEPTED)) +} - Ok(web::Json(failed).customize().with_status(status)) +/// Inserts notifications for all users and tries to send emails immediately. +/// +/// Responds with the user IDs that could not be emailed and a reason why +#[post( + "external_notifications/email-sync", + guard = "external_notification_key_guard" +)] +pub async fn create_email_sync( + pool: web::Data, + redis: web::Data, + email_queue: web::Data, + data: web::Json, +) -> Result<(web::Json>, StatusCode), ApiError> { + let data = data.into_inner(); + create_impl( + pool, + redis, + email_queue, + CreateNotification { + body: data.body, + user_ids: data.user_ids, + email: EmailStrategy::Sync, + }, + ) + .await + .map(|(res, code)| { + (web::Json(res.into_inner().into_keys().collect()), code) + }) } #[derive(Deserialize)]