Skip to content

Commit

Permalink
Recommendation endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
dcadenas committed Sep 18, 2024
1 parent c978c09 commit 79b2058
Show file tree
Hide file tree
Showing 7 changed files with 361 additions and 49 deletions.
30 changes: 30 additions & 0 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,34 @@ services:
restart: always
attach: true

# May not be used locally but it's a good example on how to run it from production with:
# docker compose --profile pagerank-profile run pagerank
pagerank:
build:
context: .
target: final
environment:
- APP__followers__neo4j_uri=db:7687
- APP__followers__neo4j_user=neo4j
- APP__followers__neo4j_password=mydevpassword
- APP__ENVIRONMENT=development
- GOOGLE_APPLICATION_CREDENTIALS=/app/gcloud/application_default_credentials.json
- RUST_LOG=nos_followers=debug
- RUST_BACKTRACE=1
volumes:
- ./config:/app/config
- cargo-registry:/usr/local/cargo/registry
- cargo-git:/usr/local/cargo/git/db
- build-cache:/app/target
- ${HOME}/.config/gcloud/application_default_credentials.json:/app/gcloud/application_default_credentials.json
entrypoint: ["/bin/pagerank"]
depends_on:
db:
condition: service_healthy
restart: "no"
profiles:
- pagerank-profile

relay:
image: ghcr.io/planetary-social/nosrelay:latest
ports:
Expand All @@ -47,6 +75,8 @@ services:
- NEO4J_apoc_import_file_use__neo4j__config=true
- NEO4J_PLUGINS=["apoc", "graph-data-science"]
- NEO4J_dbms_logs_debug_level=DEBUG
- NEO4J_dbms_logs_query_enabled=VERBOSE
- NEO4J_dbms_logs_query_threshold=0
volumes:
- db-data:/data
- db-logs:/logs
Expand Down
7 changes: 7 additions & 0 deletions src/bin/pagerank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ async fn main() -> Result<()> {
.context("Failed to connect to Neo4j")?;

let repo = Arc::new(Repo::new(graph));
repo.log_neo4j_details().await?;

info!("Update memory graph");
if let Err(e) = repo.update_memory_graph().await {
error!("Memory graph update failed: {:?}", e);
return Err(e).context("Memory graph update encountered an error");
}

info!("Executing PageRank update");
if let Err(e) = repo.update_pagerank().await {
Expand Down
15 changes: 14 additions & 1 deletion src/http_server.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,34 @@
mod router;
use crate::repo::Repo;
use crate::repo::RepoTrait;
use anyhow::{Context, Result};
use axum::Router;
use router::create_router;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::timeout;
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use tracing::{error, info};

pub struct AppState<T>
where
T: RepoTrait,
{
pub repo: Arc<T>,
}

pub struct HttpServer;
impl HttpServer {
pub fn start(
task_tracker: TaskTracker,
http_port: u16,
repo: Arc<Repo>,
cancellation_token: CancellationToken,
) -> Result<()> {
let router = create_router()?;
let state = Arc::new(AppState { repo }); // Create the shared state
let router = create_router(state)?; // Pass the state to the router

start_http_server(task_tracker, http_port, router, cancellation_token);

Ok(())
Expand Down
76 changes: 71 additions & 5 deletions src/http_server/router.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
use super::AppState;
use crate::metrics::setup_metrics;
use crate::repo::{Recommendation, RepoError, RepoTrait};
use anyhow::Result;
use axum::{http::HeaderMap, response::Html};
use axum::{response::IntoResponse, routing::get, Router};
use axum::Json;
use axum::{
extract::State, http::HeaderMap, http::StatusCode, response::Html, response::IntoResponse,
routing::get, Router,
};
use nostr_sdk::PublicKey;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tower_http::trace::{DefaultMakeSpan, DefaultOnResponse, TraceLayer};
use tower_http::LatencyUnit;
use tower_http::{timeout::TimeoutLayer, trace::DefaultOnFailure};
use tracing::Level;

pub fn create_router() -> Result<Router> {
pub fn create_router<T>(state: Arc<AppState<T>>) -> Result<Router>
where
T: RepoTrait + 'static, // 'static is needed because the router needs to be static
{
let tracing_layer = TraceLayer::new_for_http()
.make_span_with(DefaultMakeSpan::new().level(Level::INFO))
.on_response(
Expand All @@ -19,15 +30,44 @@ pub fn create_router() -> Result<Router> {
.on_failure(DefaultOnFailure::new().level(Level::ERROR));

let metrics_handle = setup_metrics()?;

Ok(Router::new()
.route("/", get(serve_root_page))
.route("/metrics", get(|| async move { metrics_handle.render() }))
.route(
"/recommendations/:pubkey",
get(get_recommendations_handler::<T>),
) // Make handler generic
.layer(tracing_layer)
.layer(TimeoutLayer::new(Duration::from_secs(1)))
.route("/metrics", get(|| async move { metrics_handle.render() })))
.with_state(state)) // Attach state to the router
}

async fn get_recommendations_handler<T>(
State(state): State<Arc<AppState<T>>>, // Extract shared state with generic RepoTrait
axum::extract::Path(pubkey): axum::extract::Path<String>, // Extract pubkey from the path
) -> Result<Json<Vec<Recommendation>>, ApiError>
where
T: RepoTrait,
{
let public_key = PublicKey::from_hex(&pubkey).map_err(|_| ApiError::InvalidPublicKey)?;
cached_get_recommendations(state.repo.clone(), public_key)
.await
.map_err(ApiError::from)
}

async fn cached_get_recommendations<T>(
repo: Arc<T>,
public_key: PublicKey,
) -> Result<Json<Vec<Recommendation>>, RepoError>
where
T: RepoTrait,
{
let recommendations = repo.get_recommendations(&public_key).await?;
Ok(Json(recommendations))
}

async fn serve_root_page(_headers: HeaderMap) -> impl IntoResponse {
// TODO: Some stats or useful info about the server here?
let body = r#"
<html>
<head>
Expand All @@ -41,3 +81,29 @@ async fn serve_root_page(_headers: HeaderMap) -> impl IntoResponse {

Html(body)
}

#[derive(Error, Debug)]
pub enum ApiError {
#[error("Invalid public key")]
InvalidPublicKey,
#[error(transparent)]
RepoError(#[from] RepoError),
#[error(transparent)]
AxumError(#[from] axum::Error),
}

impl IntoResponse for ApiError {
fn into_response(self) -> axum::response::Response {
let (status, body) = match self {
ApiError::InvalidPublicKey => {
(StatusCode::BAD_REQUEST, "Invalid public key".to_string())
}
ApiError::RepoError(_) => (
StatusCode::INTERNAL_SERVER_ERROR,
"Repository error".to_string(),
),
ApiError::AxumError(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Axum error".to_string()),
};
(status, body).into_response()
}
}
5 changes: 4 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use nos_followers::{
http_server::HttpServer,
migrations::apply_migrations,
relay_subscriber::{create_client, start_nostr_subscription},
repo::Repo,
repo::{Repo, RepoTrait},
scheduler::start_scheduler,
tcp_importer::start_tcp_importer,
worker_pool::WorkerPool,
Expand Down Expand Up @@ -95,6 +95,8 @@ async fn start(settings: Settings) -> Result<()> {
.context("Failed applying migrations")?;
let repo = Arc::new(Repo::new(graph));

repo.log_neo4j_details().await?;

info!("Initializing workers for follower list diff calculation");
let shared_nostr_client = Arc::new(create_client());
let (follow_change_sender, _) =
Expand Down Expand Up @@ -176,6 +178,7 @@ async fn start(settings: Settings) -> Result<()> {
HttpServer::start(
task_tracker.clone(),
settings.http_port,
repo.clone(),
cancellation_token.clone(),
)?;

Expand Down
Loading

0 comments on commit 79b2058

Please sign in to comment.