Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for MarkPlayed event from Jellyfin #1288

Merged
merged 6 commits into from
Mar 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions crates/services/integration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,14 @@ impl IntegrationService {
return Err(Error::new("Integration is disabled".to_owned()));
}
let maybe_progress_update = match integration.provider {
IntegrationProvider::Kodi => sink::kodi::yank_progress(payload).await,
IntegrationProvider::Emby => sink::emby::yank_progress(payload, &self.0.db).await,
IntegrationProvider::JellyfinSink => sink::jellyfin::yank_progress(payload).await,
IntegrationProvider::Kodi => sink::kodi::sink_progress(payload).await,
IntegrationProvider::Emby => sink::emby::sink_progress(payload, &self.0.db).await,
IntegrationProvider::JellyfinSink => sink::jellyfin::sink_progress(payload).await,
IntegrationProvider::PlexSink => {
let specifics = integration.clone().provider_specifics.unwrap();
sink::plex::yank_progress(payload, &self.0.db, specifics.plex_sink_username).await
sink::plex::sink_progress(payload, &self.0.db, specifics.plex_sink_username).await
}
IntegrationProvider::GenericJson => sink::generic_json::yank_progress(payload).await,
IntegrationProvider::GenericJson => sink::generic_json::sink_progress(payload).await,
_ => return Err(Error::new("Unsupported integration source".to_owned())),
};
match maybe_progress_update {
Expand Down
2 changes: 1 addition & 1 deletion crates/services/integration/src/sink/emby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ mod models {
}
}

pub async fn yank_progress(payload: String, db: &DatabaseConnection) -> Result<ImportResult> {
pub async fn sink_progress(payload: String, db: &DatabaseConnection) -> Result<ImportResult> {
let payload: models::EmbyWebhookPayload = serde_json::from_str(&payload)?;
let runtime = payload
.item
Expand Down
2 changes: 1 addition & 1 deletion crates/services/integration/src/sink/generic_json.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::{bail, Result};
use dependent_models::{CompleteExport, ImportCompletedItem, ImportResult};

pub async fn yank_progress(payload: String) -> Result<ImportResult> {
pub async fn sink_progress(payload: String) -> Result<ImportResult> {
let payload = match serde_json::from_str::<CompleteExport>(&payload) {
Ok(val) => val,
Err(err) => bail!(err),
Expand Down
52 changes: 30 additions & 22 deletions crates/services/integration/src/sink/jellyfin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,26 @@ mod models {
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "PascalCase")]
pub struct JellyfinWebhookItemPayload {
pub run_time_ticks: Option<Decimal>,
#[serde(rename = "Type")]
pub item_type: String,
pub provider_ids: JellyfinWebhookItemProviderIdsPayload,
#[serde(rename = "ParentIndexNumber")]
pub season_number: Option<i32>,
#[serde(rename = "IndexNumber")]
pub episode_number: Option<i32>,
pub run_time_ticks: Option<Decimal>,
pub provider_ids: JellyfinWebhookItemProviderIdsPayload,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "PascalCase")]
pub struct JellyfinWebhookPayload {
pub event: Option<String>,
pub item: JellyfinWebhookItemPayload,
pub series: Option<JellyfinWebhookItemPayload>,
pub session: JellyfinWebhookSessionPayload,
pub session: Option<JellyfinWebhookSessionPayload>,
}
}

pub async fn yank_progress(payload: String) -> Result<ImportResult> {
pub async fn sink_progress(payload: String) -> Result<ImportResult> {
let payload = serde_json::from_str::<models::JellyfinWebhookPayload>(&payload)?;
let identifier = payload
.item
Expand All @@ -62,35 +62,43 @@ pub async fn yank_progress(payload: String) -> Result<ImportResult> {
.ok_or_else(|| anyhow::anyhow!("No TMDb ID associated with this media"))?
.clone();

let runtime = payload
.item
.run_time_ticks
.ok_or_else(|| anyhow::anyhow!("No run time associated with this media"))?;

let position = payload
.session
.play_state
.position_ticks
.ok_or_else(|| anyhow::anyhow!("No position associated with this media"))?;

let lot = match payload.item.item_type.as_str() {
"Episode" => MediaLot::Show,
"Movie" => MediaLot::Movie,
_ => bail!("Only movies and shows supported"),
};

let mut seen_item = ImportOrExportMetadataItemSeen {
show_season_number: payload.item.season_number,
show_episode_number: payload.item.episode_number,
provider_watched_on: Some("Jellyfin".to_string()),
..Default::default()
};

match payload.event.unwrap_or_default().as_str() {
"MarkPlayed" => {}
_ => {
let runtime = payload
.item
.run_time_ticks
.ok_or_else(|| anyhow::anyhow!("No run time associated with this media"))?;

let position = payload
.session
.as_ref()
.and_then(|s| s.play_state.position_ticks.as_ref())
.ok_or_else(|| anyhow::anyhow!("No position associated with this media"))?;

seen_item.progress = Some(position / runtime * dec!(100));
}
}

Ok(ImportResult {
completed: vec![ImportCompletedItem::Metadata(ImportOrExportMetadataItem {
lot,
identifier,
source: MediaSource::Tmdb,
seen_history: vec![ImportOrExportMetadataItemSeen {
progress: Some(position / runtime * dec!(100)),
show_season_number: payload.item.season_number,
show_episode_number: payload.item.episode_number,
provider_watched_on: Some("Jellyfin".to_string()),
..Default::default()
}],
seen_history: vec![seen_item],
..Default::default()
})],
..Default::default()
Expand Down
2 changes: 1 addition & 1 deletion crates/services/integration/src/sink/kodi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ struct IntegrationMediaSeen {
show_episode_number: Option<i32>,
}

pub async fn yank_progress(payload: String) -> Result<ImportResult> {
pub async fn sink_progress(payload: String) -> Result<ImportResult> {
let payload = match serde_json::from_str::<IntegrationMediaSeen>(&payload) {
Ok(val) => val,
Err(err) => bail!(err),
Expand Down
2 changes: 1 addition & 1 deletion crates/services/integration/src/sink/plex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ fn calculate_progress(payload: &models::PlexWebhookPayload) -> Result<Decimal> {
}
}

pub async fn yank_progress(
pub async fn sink_progress(
payload: String,
db: &DatabaseConnection,
plex_user: Option<String>,
Expand Down
2 changes: 1 addition & 1 deletion docs/content/integrations.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ will work for all the media that have a valid TMDb ID attached to their metadata
- Webhook Url => `<paste_url_copied>`
- Payload format => `Default`
- Listen to events only for => Choose your user
- Events => `Play`, `Pause`, `Resume`, `Stop` and `Progress`
- Events => `Play`, `Pause`, `Resume`, `Stop`, `Progress` and `MarkPlayed`

### Emby

Expand Down