Skip to content

Commit

Permalink
Refactoring worker to cache writer as much as possible
Browse files Browse the repository at this point in the history
  • Loading branch information
DoumanAsh committed Aug 14, 2024
1 parent b5495d4 commit c29656c
Showing 1 changed file with 37 additions and 22 deletions.
59 changes: 37 additions & 22 deletions src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ pub fn thread<MW: MakeWriter>(tag: &'static str, writer: MW, max_msg_record: usi

let worker = worker.spawn(move || {
let mut msg = fluent::Message::new(tag);
let mut ongoing_writer = None;

'main_loop: loop {
//Fetch up to max_msg_record
Expand All @@ -92,22 +93,28 @@ pub fn thread<MW: MakeWriter>(tag: &'static str, writer: MW, max_msg_record: usi
}
}

let mut writer = match writer.make() {
Ok(writer) => writer,
Err(_) => {
std::thread::sleep(time::Duration::from_secs(1));
match writer.make() {
Ok(writer) => writer,
Err(error) => {
tracing::event!(tracing::Level::DEBUG, "Failed to create fluent writer {}", error);
continue 'main_loop;
let mut writer = match ongoing_writer.take() {
Some(writer) => writer,
None => match writer.make() {
Ok(writer) => writer,
Err(_) => {
std::thread::sleep(time::Duration::from_secs(1));
match writer.make() {
Ok(writer) => writer,
Err(error) => {
tracing::event!(tracing::Level::DEBUG, "Failed to create fluent writer {}", error);
continue 'main_loop;
}
}
}
}
};

match rmp_serde::encode::write(&mut writer, &msg) {
Ok(()) => msg.clear(),
Ok(()) => {
msg.clear();
ongoing_writer = Some(writer);
},
//In case of error we'll just retry at later date.
//Ideally we should be able to recover.
//But report error?
Expand All @@ -119,24 +126,32 @@ pub fn thread<MW: MakeWriter>(tag: &'static str, writer: MW, max_msg_record: usi

if msg.len() > 0 {
//Try to flush last records, but don't wait too much
for _ in 0..2 {
match writer.make() {
Ok(mut writer) => {
if let Err(error) = rmp_serde::encode::write(&mut writer, &msg) {
tracing::event!(tracing::Level::INFO, "Failed to send last records to fluent server {}", error);
for _ in 0..3 {
let mut writer = match ongoing_writer.take() {
Some(writer) => writer,
None => match writer.make() {
Ok(writer) => writer,
Err(_) => {
std::thread::sleep(time::Duration::from_secs(1));
} else {
break;
match writer.make() {
Ok(writer) => writer,
Err(error) => {
tracing::event!(tracing::Level::DEBUG, "Failed to create fluent writer {}", error);
continue
}
}
}
},
Err(error) => {
tracing::event!(tracing::Level::INFO, "Failed to create fluent server {}", error);
std::thread::sleep(time::Duration::from_secs(1));
}
};

if let Err(error) = rmp_serde::encode::write(&mut writer, &msg) {
tracing::event!(tracing::Level::INFO, "Failed to send last records to fluent server {}", error);
std::thread::sleep(time::Duration::from_secs(1));
} else {
break;
}
}
}

})?;

Ok(ThreadWorker {
Expand Down

0 comments on commit c29656c

Please sign in to comment.