wrap subscribe into transaction

This commit is contained in:
Andre Heber
2024-02-27 15:36:49 +01:00
parent fd4ba6ae31
commit 859f383822

View File

@ -3,7 +3,7 @@ use rand::{distributions::Alphanumeric, thread_rng, Rng};
use serde::Deserialize;
use chrono::Utc;
use uuid::Uuid;
use sqlx::{query, Pool, Postgres};
use sqlx::{query, Pool, Postgres, Transaction};
use lettre::{
address::AddressError, message::{header::ContentType, Mailbox}, transport::smtp::authentication::Credentials, Message, SmtpTransport, Transport
};
@ -34,12 +34,19 @@ pub async fn subscribe(
Ok(subscriber) => subscriber,
Err(_) => return HttpResponse::BadRequest().finish(),
};
let subscriber_id = match insert_subscriber(&new_subscriber, &connection_pool).await {
let mut transaction = match connection_pool.begin().await {
Ok(transaction) => transaction,
Err(_) => return HttpResponse::InternalServerError().finish(),
};
let subscriber_id = match insert_subscriber(&new_subscriber, &mut transaction).await {
Ok(subscriber_id) => subscriber_id,
Err(_) => return HttpResponse::InternalServerError().finish(),
};
let subscription_token = generate_confirmation_token();
if store_token(&connection_pool, &subscriber_id, &subscription_token).await.is_err() {
if store_token(&mut transaction, &subscriber_id, &subscription_token).await.is_err() {
return HttpResponse::InternalServerError().finish();
}
if transaction.commit().await.is_err() {
return HttpResponse::InternalServerError().finish();
}
if send_confirmation_email(&email_client, new_subscriber, &base_url.0, &subscription_token).await.is_err() {
@ -139,9 +146,9 @@ impl TryFrom<FormData> for NewSubscriber {
#[tracing::instrument(
name = "Saving new subscriber details in the database",
skip(new_subscriber, connection_pool)
skip(new_subscriber, transaction)
)]
async fn insert_subscriber(new_subscriber: &NewSubscriber, connection_pool: &Pool<Postgres>) -> Result<Uuid, sqlx::Error> {
async fn insert_subscriber(new_subscriber: &NewSubscriber, transaction: &mut Transaction<'_, Postgres>) -> Result<Uuid, sqlx::Error> {
let subscriber_id = Uuid::new_v4();
query!(
r#"
@ -153,7 +160,7 @@ async fn insert_subscriber(new_subscriber: &NewSubscriber, connection_pool: &Poo
new_subscriber.name.as_ref(),
Utc::now()
)
.execute(connection_pool)
.execute(&mut **transaction)
.await
.map_err(|e| {
tracing::error!("Failed to execute query: {:?}", e);
@ -164,9 +171,9 @@ async fn insert_subscriber(new_subscriber: &NewSubscriber, connection_pool: &Poo
#[tracing::instrument(
name = "Storing subscription token in the database",
skip(connection_pool, subscriber_id, subscription_token)
skip(transaction, subscriber_id, subscription_token)
)]
async fn store_token(connection_pool: &Pool<Postgres>, subscriber_id: &Uuid, subscription_token: &str) -> Result<(), sqlx::Error> {
async fn store_token(transaction: &mut Transaction<'_, Postgres>, subscriber_id: &Uuid, subscription_token: &str) -> Result<(), sqlx::Error> {
query!(
r#"
INSERT INTO subscription_tokens (subscription_token, subscriber_id)
@ -175,7 +182,7 @@ async fn store_token(connection_pool: &Pool<Postgres>, subscriber_id: &Uuid, sub
subscription_token,
subscriber_id
)
.execute(connection_pool)
.execute(&mut **transaction)
.await
.map_err(|e| {
tracing::error!("Failed to execute query: {:?}", e);