Skip to content

Commit

Permalink
rework tear down of result worker threads
Browse files Browse the repository at this point in the history
seems like the previous way to tear down threads crashed naemon once in a while during reload.
  • Loading branch information
sni committed Nov 6, 2024
1 parent 54120e4 commit 0b3248d
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 40 deletions.
37 changes: 24 additions & 13 deletions neb_module_naemon/mod_gearman.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ static pthread_mutex_t mod_gm_log_lock = PTHREAD_MUTEX_INITIALIZER;
void *gearman_module_handle=NULL;

int result_threads_running;
pthread_t result_thr[GM_LISTSIZE];
int gm_should_terminate = FALSE;
pthread_t * result_thr[GM_LISTSIZE];
char target_queue[GM_SMALLBUFSIZE];
char temp_buffer[GM_MAX_OUTPUT];
char uniq[GM_SMALLBUFSIZE];
Expand Down Expand Up @@ -96,6 +97,7 @@ static int try_check_dummy(const char *, host *, service * );
int nebmodule_init( int flags, char *args, nebmodule *handle ) {
int broker_option_errors = 0;
result_threads_running = 0;
gm_should_terminate = FALSE;

/* save our handle */
gearman_module_handle=handle;
Expand Down Expand Up @@ -185,15 +187,10 @@ int nebmodule_init( int flags, char *args, nebmodule *handle ) {
schedule_event(1, move_results_to_core, NULL);

/* log at least one line into the core logfile */
if ( mod_gm_opt->logmode != GM_LOG_MODE_CORE ) {
int logmode_saved = mod_gm_opt->logmode;
mod_gm_opt->logmode = GM_LOG_MODE_CORE;
if(strlen(GIT_HASH) > 0)
gm_log( GM_LOG_INFO, "initialized version %s (build: %s) (libgearman %s)\n", GM_VERSION, GIT_HASH, gearman_version() );
else
gm_log( GM_LOG_INFO, "initialized version %s (libgearman %s)\n", GM_VERSION, gearman_version() );
mod_gm_opt->logmode = logmode_saved;
}
if(strlen(GIT_HASH) > 0)
nm_log( NSLOG_INFO_MESSAGE, "mod_gearman: initialized version %s (build: %s) (libgearman %s)\n", GM_VERSION, GIT_HASH, gearman_version() );
else
nm_log( NSLOG_INFO_MESSAGE, "mod_gearman: initialized version %s (libgearman %s)\n", GM_VERSION, gearman_version() );

gm_log( GM_LOG_DEBUG, "finished initializing\n" );

Expand Down Expand Up @@ -238,6 +235,7 @@ static void register_neb_callbacks(void) {
int nebmodule_deinit( int flags, int reason ) {
int x;

nm_log( NSLOG_INFO_MESSAGE, "mod_gearman: deinitializing\n" );
gm_log( GM_LOG_TRACE, "nebmodule_deinit(%i, %i)\n", flags, reason );

/* should be removed already, but just for the case it wasn't */
Expand Down Expand Up @@ -273,9 +271,13 @@ int nebmodule_deinit( int flags, int reason ) {
gm_log( GM_LOG_DEBUG, "deregistered callbacks\n" );

/* stop result threads */
gm_should_terminate = TRUE;
for(x = 0; x < result_threads_running; x++) {
pthread_cancel(result_thr[x]);
pthread_join(result_thr[x], NULL);
if(pthread_join(*(result_thr[x]), NULL) != OK) {
gm_log( GM_LOG_ERROR, "failed to join result thread\n" );
}
gm_free(result_thr[x]);
result_thr[x] = NULL;
}

/* cleanup */
Expand Down Expand Up @@ -343,12 +345,21 @@ void mod_gm_add_result_to_list(check_result * newcheckresult) {

/* start our threads */
static void start_threads(void) {
int ret = 0;
pthread_t *thr;
if ( result_threads_running < mod_gm_opt->result_workers ) {
/* create result worker */
int x;
for(x = 0; x < mod_gm_opt->result_workers; x++) {
result_threads_running++;
pthread_create ( &result_thr[x], NULL, result_worker, (void *)&result_threads_running);
thr = malloc(sizeof(pthread_t));
if((ret = pthread_create ( thr, NULL, result_worker, (void *)&result_threads_running)) != OK) {
gm_log( GM_LOG_ERROR, "failed to create result thread: %s\n", strerror(ret));
result_thr[x] = NULL;
result_threads_running--;
} else {
result_thr[x] = thr;
}
}
}
}
Expand Down
54 changes: 27 additions & 27 deletions neb_module_naemon/result_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ extern float current_submit_rate;
extern float current_avg_submit_duration;
extern double current_submit_max;
extern int result_threads_running;
extern int gm_should_terminate;

__thread EVP_CIPHER_CTX * result_ctx = NULL; /* make ssl context local in each thread */

Expand All @@ -54,20 +55,6 @@ static struct check_engine mod_gearman_check_engine = {
NULL
};

/* cleanup and exit this thread */
static void cancel_worker_thread (void * data) {

if(data != NULL) {
gearman_worker_st **worker = (gearman_worker_st**) data;
gm_free_worker(worker);
}

mod_gm_crypt_deinit(result_ctx);
gm_log( GM_LOG_DEBUG, "worker thread finished\n" );

return;
}

/* callback for task completed */
void *result_worker( void * data ) {
gearman_worker_st *worker = NULL;
Expand All @@ -77,28 +64,40 @@ void *result_worker( void * data ) {
gm_log( GM_LOG_TRACE, "worker %d started\n", *worker_num );
gethostname(hostname, GM_SMALLBUFSIZE-1);

pthread_setcancelstate (PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype (PTHREAD_CANCEL_ASYNCHRONOUS, NULL);

set_worker(&worker);
pthread_cleanup_push(cancel_worker_thread, (void*) &worker);

result_ctx = mod_gm_crypt_init(mod_gm_opt->crypt_key);

while ( 1 ) {
while( gm_should_terminate == FALSE ) {
ret = gearman_worker_work(worker);
if ( ret != GEARMAN_SUCCESS && ret != GEARMAN_WORK_FAIL ) {
if ( ret != GEARMAN_TIMEOUT)
gm_log( GM_LOG_ERROR, "worker error: %s\n", gearman_worker_error(worker));
switch(ret) {
case GEARMAN_SUCCESS:
case GEARMAN_TIMEOUT:
break;
case GEARMAN_NO_JOBS:
case GEARMAN_IO_WAIT:
usleep(100000); // wait 100ms
break;
case GEARMAN_WORK_FAIL:
default:
gm_log( GM_LOG_ERROR, "worker error: %s\n", gearman_worker_error(worker));

gm_free_worker(&worker);
sleep(1);

set_worker(&worker);
break;
gm_log( GM_LOG_ERROR, "worker error: %s\n", gearman_worker_error(worker));
break;
}
}

pthread_cleanup_pop(0);
if(worker != NULL) {
gm_free_worker(&worker);
}

mod_gm_crypt_deinit(result_ctx);
gm_log( GM_LOG_DEBUG, "worker thread finished\n" );

return NULL;
}

Expand Down Expand Up @@ -342,9 +341,10 @@ int set_worker( gearman_worker_st **worker ) {
/* add our dummy queue, gearman sometimes forgets the last added queue */
worker_add_function( w, "dummy", dummy);

/* let our worker renew itself every 30 seconds */
if(mod_gm_opt->server_num > 1)
gearman_worker_set_timeout(w, 30000);
// required to gracefully shutdown
// shutdown conditions are checked after each check result, so in worst case
// without any job, shutdown will take up to 5 seconds.
gearman_worker_set_timeout(w, 5000);

return GM_OK;
}

0 comments on commit 0b3248d

Please sign in to comment.