Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 157 additions & 81 deletions apps/labrinth/src/routes/internal/external_notifications.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,32 @@
use std::collections::HashMap;

use crate::auth::get_user_from_headers;
use crate::database::PgPool;
use crate::database::models::ids::{DBNotificationId, DBUserId};
use crate::database::models::notification_item::DBNotification;
use crate::database::models::notification_item::NotificationBuilder;
use crate::database::models::user_item::DBUser;
use crate::database::redis::RedisPool;
use crate::models::notifications::NotificationDeliveryStatus;
use crate::models::users::Role;
use crate::models::v3::notifications::{
Notification, NotificationBody, NotificationDeliveryStatus,
};
use crate::models::v3::notifications::{Notification, NotificationBody};
use crate::models::v3::pats::Scopes;
use crate::queue::email::EmailQueue;
use crate::queue::session::AuthQueue;
use crate::routes::ApiError;
use crate::routes::internal::external_notifications::EmailFailure::{
FailedToSend, MailboxNotFound, UserNotFound,
};
use crate::routes::internal::statuses::broadcast_friends_message;
use crate::sync::friends::RedisFriendsMessage;
use crate::util::guards::external_notification_key_guard;
use actix_web::http::StatusCode;
use actix_web::web;
use actix_web::{
CustomizeResponder, HttpRequest, HttpResponse, Responder, delete, post,
};
use actix_web::{HttpRequest, HttpResponse, delete, post};
use ariadne::ids::UserId;
use eyre::eyre;
use lettre::message::Mailbox;
use serde::Deserialize;
use serde::{Deserialize, Serialize};

pub fn config(cfg: &mut web::ServiceConfig) {
cfg.service(create)
Expand All @@ -33,66 +35,72 @@ pub fn config(cfg: &mut web::ServiceConfig) {
.service(send_custom_email);
}

#[derive(Deserialize, PartialEq, Default)]
enum EmailStrategy {
#[default]
Async,
Sync,
None,
}

#[derive(Deserialize)]
struct CreateNotification {
pub body: NotificationBody,
pub user_ids: Vec<UserId>,
#[serde(default)]
pub email: EmailStrategy,
}

#[derive(thiserror::Error, Debug, Serialize)]
#[serde(tag = "type", content = "data")]
enum EmailFailure {
#[error("user not found")]
UserNotFound,
#[error("mailbox not found")]
MailboxNotFound,
#[error("failed to send: {0:?}")]
FailedToSend(NotificationDeliveryStatus),
#[error("api error: {0}")]
ApiError(
#[serde(serialize_with = "serialize_api_error")]
#[from]
crate::routes::ApiError,
),
}

fn serialize_api_error<S>(
error: &ApiError,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
error.as_api_error().serialize(serializer)
}

#[post("external_notifications", guard = "external_notification_key_guard")]
pub async fn create(
pool: web::Data<PgPool>,
redis: web::Data<RedisPool>,
email_queue: web::Data<EmailQueue>,
create_notification: web::Json<CreateNotification>,
) -> Result<HttpResponse, ApiError> {
let CreateNotification { body, user_ids } =
create_notification.into_inner();
let user_ids = user_ids
.into_iter()
.map(|x| DBUserId(x.0 as i64))
.collect::<Vec<_>>();

let mut txn = pool.begin().await?;

if !DBUser::exists_many(&user_ids, &mut txn).await? {
return Err(ApiError::InvalidInput(
"One of the specified users do not exist.".to_owned(),
));
}

let notification_ids = NotificationBuilder { body }
.insert_many(user_ids, &mut txn, &redis)
.await?;

let notifications =
get_site_exposed_notifications(&notification_ids, &mut txn).await?;

txn.commit().await?;

broadcast_notifications(&redis, notifications).await;

Ok(HttpResponse::Accepted().finish())
) -> Result<(web::Json<HashMap<UserId, EmailFailure>>, StatusCode), ApiError> {
create_impl(pool, redis, email_queue, create_notification.into_inner())
.await
}

/// Inserts notifications for all users and tries to send emails immediately.
///
/// Responds with the user IDs that could not be emailed:
/// - `200` if every recipient was emailed (empty list)
/// - `207` if some recipients could not be emailed (list of failed IDs)
#[post(
"external_notifications/email-sync",
guard = "external_notification_key_guard"
)]
pub async fn create_email_sync(
async fn create_impl(
pool: web::Data<PgPool>,
redis: web::Data<RedisPool>,
email_queue: web::Data<EmailQueue>,
create_notification: web::Json<CreateNotification>,
) -> Result<CustomizeResponder<web::Json<Vec<UserId>>>, ApiError> {
let CreateNotification { body, user_ids } =
create_notification.into_inner();
data: CreateNotification,
) -> Result<(web::Json<HashMap<UserId, EmailFailure>>, StatusCode), ApiError> {
let CreateNotification {
body,
user_ids,
email: email_strategy,
} = data;
let raw_user_ids = user_ids.iter().map(|x| x.0 as i64).collect::<Vec<_>>();

let user_ids = raw_user_ids
.iter()
.map(|x| DBUserId(*x))
Expand Down Expand Up @@ -129,9 +137,21 @@ pub async fn create_email_sync(
.filter(|id| !already_notified.contains(id))
.collect::<Vec<_>>();

let notification_ids = NotificationBuilder { body: body.clone() }
.insert_many_without_delivery(notification_user_ids, &mut txn, &redis)
.await?;
let notification_builder = NotificationBuilder { body: body.clone() };

let notification_ids = if email_strategy == EmailStrategy::Async {
notification_builder
.insert_many(notification_user_ids, &mut txn, &redis)
.await?
} else {
notification_builder
.insert_many_without_delivery(
notification_user_ids,
&mut txn,
&redis,
)
.await?
};

let notifications =
get_site_exposed_notifications(&notification_ids, &mut txn).await?;
Expand All @@ -140,42 +160,98 @@ pub async fn create_email_sync(

broadcast_notifications(&redis, notifications).await;

let mut email_txn = pool.begin().await?;
if email_strategy == EmailStrategy::Sync {
let mut email_txn = pool.begin().await?;

let mut failed = HashMap::new();
let users = DBUser::get_many_ids(&user_ids, &mut email_txn, &redis)
.await?
.into_iter()
.map(|user| (user.id, user))
.collect::<HashMap<_, _>>();

for db_user_id in &user_ids {
let user_id = UserId(db_user_id.0 as u64);
let Some(user) = users.get(db_user_id) else {
failed.insert(user_id, UserNotFound);
continue;
};

let Some(mailbox) = user
.email
.as_ref()
.and_then(|email| email.parse::<Mailbox>().ok())
else {
failed.insert(user_id, MailboxNotFound);
continue;
};

match email_queue
.send_one(&mut email_txn, body.clone(), *db_user_id, mailbox)
.await
{
Ok(status) => {
if status != NotificationDeliveryStatus::Delivered {
failed.insert(user_id, FailedToSend(status));
}
}
Err(error) => {
if matches!(
error,
ApiError::SqlxDatabase(_) | ApiError::Database(_)
) {
return Err(error);
};
failed.insert(user_id, error.into());
}
};
}

let mut failed = Vec::new();
for user_id in &user_ids {
let Some(user) =
DBUser::get_id(*user_id, &mut email_txn, &redis).await?
else {
failed.push(UserId(user_id.0 as u64));
continue;
};
email_txn.commit().await?;

let delivered = match user
.email
.and_then(|email| email.parse::<Mailbox>().ok())
let status = if failed
.values()
.any(|x| matches!(x, EmailFailure::ApiError(_)))
{
Some(mailbox) => {
email_queue
.send_one(&mut email_txn, body.clone(), *user_id, mailbox)
.await?
== NotificationDeliveryStatus::Delivered
}
None => false,
StatusCode::INTERNAL_SERVER_ERROR
} else {
StatusCode::OK
};

if !delivered {
failed.push(UserId(user_id.0 as u64));
}
return Ok((web::Json(failed), status));
}

let status = if failed.is_empty() {
StatusCode::OK
} else {
StatusCode::MULTI_STATUS
};
Ok((web::Json(HashMap::new()), StatusCode::ACCEPTED))
}

Ok(web::Json(failed).customize().with_status(status))
/// Inserts notifications for all users and tries to send emails immediately.
///
/// Responds with the user IDs that could not be emailed and a reason why
#[post(
"external_notifications/email-sync",
Comment thread
fetchfern marked this conversation as resolved.
guard = "external_notification_key_guard"
)]
pub async fn create_email_sync(
pool: web::Data<PgPool>,
redis: web::Data<RedisPool>,
email_queue: web::Data<EmailQueue>,
data: web::Json<CreateNotification>,
) -> Result<(web::Json<Vec<UserId>>, StatusCode), ApiError> {
let data = data.into_inner();
create_impl(
pool,
redis,
email_queue,
CreateNotification {
body: data.body,
user_ids: data.user_ids,
email: EmailStrategy::Sync,
},
)
.await
.map(|(res, code)| {
(web::Json(res.into_inner().into_keys().collect()), code)
})
}

#[derive(Deserialize)]
Expand Down
Loading