Skip to content

Commit 15a3fe9

Browse files
authored
Event emitter (#955)
* ci: remove env for devcontainer * ci: remove post create script * refactor(backend): more complex structure for application event * refactor(backend): move review posted to core job * chore: remove useless function call * chore(backend): emit event when entity added to collection * feat(backend): send new performer * refactor(backend): do not recreate exercise service in miscelleneous action * refactor(backend): always call job when entity added to collection * docs: change order of integration types informations * fix(backend): call correct function to handle new event * feat(backend): start implementing handler * chore(backend): change order of arguments supplied * feat(backend): complete function * chore(backend): remove useless function calls * feat(database): remove system information column * chore(backend): adapt to new database schema * chore(backend): add debug logging * fix(backend): better error handling * fix(backend): select correct identifier for metadata * chore(database): do not create useless column * fix(backend): do not force an unwrap
1 parent 01af418 commit 15a3fe9

14 files changed

+171
-244
lines changed

.devcontainer/devcontainer.json

+1-10
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,5 @@
44
"service": "ryot-app",
55
"forwardPorts": [],
66
"workspaceFolder": "/workspaces/ryot",
7-
"postCreateCommand": ". ${containerWorkspaceFolder}/.devcontainer/scripts/post-create.sh",
8-
"remoteUser": "archlinux",
9-
"remoteEnv": {
10-
"GIT_AUTHOR_NAME": "${localEnv:GIT_AUTHOR_NAME}",
11-
"GIT_AUTHOR_EMAIL": "${localEnv:GIT_AUTHOR_EMAIL}",
12-
"GIT_COMMITTER_NAME": "${localEnv:GIT_COMMITTER_NAME}",
13-
"GIT_COMMITTER_EMAIL": "${localEnv:GIT_COMMITTER_EMAIL}",
14-
"VISUAL": "hx",
15-
"EDITOR": "hx"
16-
}
7+
"remoteUser": "archlinux"
178
}

.devcontainer/scripts/post-create.sh

-5
This file was deleted.

apps/backend/src/background.rs

+20-18
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use chrono_tz::Tz;
66
use database::{MediaLot, MediaSource};
77
use serde::{Deserialize, Serialize};
88
use strum::Display;
9+
use uuid::Uuid;
910

1011
use crate::{
1112
exporter::ExporterService,
@@ -48,11 +49,6 @@ pub async fn sync_integrations_data(
4849
) -> Result<(), Error> {
4950
tracing::trace!("Getting data from yanked integrations for all users");
5051
misc_service.yank_integrations_data().await.unwrap();
51-
tracing::trace!("Sending data for push integrations for all users");
52-
misc_service
53-
.send_data_for_push_integrations()
54-
.await
55-
.unwrap();
5652
Ok(())
5753
}
5854

@@ -62,7 +58,9 @@ pub async fn sync_integrations_data(
6258
#[derive(Debug, Deserialize, Serialize, Display)]
6359
pub enum CoreApplicationJob {
6460
SyncIntegrationsData(String),
61+
ReviewPosted(ReviewPostedEvent),
6562
BulkProgressUpdate(String, Vec<ProgressUpdateInput>),
63+
EntityAddedToCollection(String, Uuid),
6664
}
6765

6866
impl Message for CoreApplicationJob {
@@ -77,20 +75,23 @@ pub async fn perform_core_application_job(
7775
tracing::trace!("Started job: {:#?}", name);
7876
let start = Instant::now();
7977
let status = match information {
80-
CoreApplicationJob::SyncIntegrationsData(user_id) => {
81-
misc_service
82-
.push_integrations_data_for_user(&user_id)
83-
.await
84-
.ok();
85-
misc_service
86-
.yank_integrations_data_for_user(&user_id)
87-
.await
88-
.is_ok()
78+
CoreApplicationJob::SyncIntegrationsData(user_id) => misc_service
79+
.yank_integrations_data_for_user(&user_id)
80+
.await
81+
.is_ok(),
82+
CoreApplicationJob::ReviewPosted(event) => {
83+
misc_service.handle_review_posted_event(event).await.is_ok()
8984
}
9085
CoreApplicationJob::BulkProgressUpdate(user_id, input) => misc_service
9186
.bulk_progress_update(user_id, input)
9287
.await
9388
.is_ok(),
89+
CoreApplicationJob::EntityAddedToCollection(user_id, collection_to_entity_id) => {
90+
misc_service
91+
.handle_entity_added_to_collection_event(user_id, collection_to_entity_id)
92+
.await
93+
.is_ok()
94+
}
9495
};
9596
tracing::trace!(
9697
"Job: {:#?}, Time Taken: {}ms, Successful = {}",
@@ -111,10 +112,10 @@ pub enum ApplicationJob {
111112
UpdatePerson(String),
112113
RecalculateCalendarEvents,
113114
AssociateGroupWithMetadata(MediaLot, MediaSource, String),
114-
ReviewPosted(ReviewPostedEvent),
115115
PerformExport(String, Vec<ExportItem>),
116116
RecalculateUserSummary(String),
117117
PerformBackgroundTasks,
118+
UpdateExerciseLibrary,
118119
}
119120

120121
impl Message for ApplicationJob {
@@ -171,13 +172,14 @@ pub async fn perform_application_job(
171172
})
172173
.await
173174
.is_ok(),
174-
ApplicationJob::ReviewPosted(event) => {
175-
misc_service.handle_review_posted_event(event).await.is_ok()
176-
}
177175
ApplicationJob::PerformExport(user_id, to_export) => exporter_service
178176
.perform_export(user_id, to_export)
179177
.await
180178
.is_ok(),
179+
ApplicationJob::UpdateExerciseLibrary => exercise_service
180+
.deploy_update_exercise_library_job()
181+
.await
182+
.is_ok(),
181183
};
182184
tracing::trace!(
183185
"Job: {:#?}, Time Taken: {}ms, Successful = {}",

apps/backend/src/entities/collection_to_entity.rs

-4
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ use sea_orm::entity::prelude::*;
55
use serde::{Deserialize, Serialize};
66
use uuid::Uuid;
77

8-
use crate::models::CollectionToEntitySystemInformation;
9-
108
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
119
#[sea_orm(table_name = "collection_to_entity")]
1210
pub struct Model {
@@ -23,8 +21,6 @@ pub struct Model {
2321
pub exercise_id: Option<String>,
2422
pub workout_id: Option<String>,
2523
pub information: Option<serde_json::Value>,
26-
#[sea_orm(column_type = "Json")]
27-
pub system_information: CollectionToEntitySystemInformation,
2824
}
2925

3026
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

apps/backend/src/fitness/resolver.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use slug::slugify;
1818
use struson::writer::{JsonStreamWriter, JsonWriter};
1919

2020
use crate::{
21-
background::ApplicationJob,
21+
background::{ApplicationJob, CoreApplicationJob},
2222
entities::{
2323
collection, collection_to_entity, exercise,
2424
prelude::{CollectionToEntity, Exercise, UserMeasurement, UserToEntity, Workout},
@@ -290,6 +290,7 @@ pub struct ExerciseService {
290290
config: Arc<config::AppConfig>,
291291
file_storage_service: Arc<FileStorageService>,
292292
perform_application_job: MemoryStorage<ApplicationJob>,
293+
perform_core_application_job: MemoryStorage<CoreApplicationJob>,
293294
}
294295

295296
impl ExerciseService {
@@ -298,12 +299,14 @@ impl ExerciseService {
298299
config: Arc<config::AppConfig>,
299300
file_storage_service: Arc<FileStorageService>,
300301
perform_application_job: &MemoryStorage<ApplicationJob>,
302+
perform_core_application_job: &MemoryStorage<CoreApplicationJob>,
301303
) -> Self {
302304
Self {
303305
config,
304306
db: db.clone(),
305307
file_storage_service,
306308
perform_application_job: perform_application_job.clone(),
309+
perform_core_application_job: perform_core_application_job.clone(),
307310
}
308311
}
309312
}
@@ -787,6 +790,7 @@ impl ExerciseService {
787790
exercise_id: Some(exercise.id.clone()),
788791
..Default::default()
789792
},
793+
&self.perform_core_application_job,
790794
)
791795
.await?;
792796
Ok(exercise.id)

apps/backend/src/integrations.rs

+2
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,7 @@ impl IntegrationService {
555555
let mut options = RadarrAddMovieOptions::new();
556556
options.search_for_movie = Some(true);
557557
resource.add_options = Some(Box::new(options));
558+
tracing::debug!("Pushing movie to Radarr {:?}", resource);
558559
radarr_api_v3_movie_post(&configuration, Some(resource))
559560
.await
560561
.trace_ok();
@@ -585,6 +586,7 @@ impl IntegrationService {
585586
let mut options = SonarrAddSeriesOptions::new();
586587
options.search_for_missing_episodes = Some(true);
587588
resource.add_options = Some(Box::new(options));
589+
tracing::debug!("Pushing series to Sonarr {:?}", resource);
588590
sonarr_api_v3_series_post(&configuration, Some(resource))
589591
.await
590592
.trace_ok();

apps/backend/src/main.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use apalis::{
1313
layers::{
1414
limit::RateLimitLayer as ApalisRateLimitLayer, tracing::TraceLayer as ApalisTraceLayer,
1515
},
16-
prelude::{MemoryStorage, Monitor, WorkerBuilder, WorkerFactoryFn},
16+
prelude::{MemoryStorage, MessageQueue, Monitor, WorkerBuilder, WorkerFactoryFn},
1717
utils::TokioExecutor,
1818
};
1919
use aws_sdk_s3::config::Region;
@@ -23,6 +23,7 @@ use axum::{
2323
routing::{get, post, Router},
2424
Extension,
2525
};
26+
use background::ApplicationJob;
2627
use chrono::{TimeZone, Utc};
2728
use database::Migrator;
2829
use itertools::Itertools;
@@ -172,9 +173,8 @@ async fn main() -> Result<()> {
172173

173174
if Exercise::find().count(&db).await? == 0 {
174175
tracing::info!("Instance does not have exercises data. Deploying job to download them...");
175-
app_services
176-
.exercise_service
177-
.deploy_update_exercise_library_job()
176+
perform_application_job_storage
177+
.enqueue(ApplicationJob::UpdateExerciseLibrary)
178178
.await
179179
.unwrap();
180180
}
@@ -198,7 +198,7 @@ async fn main() -> Result<()> {
198198
base_dir.join("backend-config-schema.yaml"),
199199
YamlTemplateRenderer::default(),
200200
)
201-
.unwrap();
201+
.ok();
202202

203203
let mut generator = SchemaGenerator::default();
204204
generator.add::<CompleteExport>();
@@ -207,7 +207,7 @@ async fn main() -> Result<()> {
207207
base_dir.join("export-schema.ts"),
208208
TypeScriptRenderer::default(),
209209
)
210-
.unwrap();
210+
.ok();
211211
}
212212

213213
let schema = get_schema(&app_services).await;

0 commit comments

Comments
 (0)