Skip to content

Commit

Permalink
improve shutdown of result thread
Browse files Browse the repository at this point in the history
  • Loading branch information
sni committed Nov 18, 2024
1 parent 1c32bae commit bca73c1
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 17 deletions.
13 changes: 8 additions & 5 deletions common/gearman_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -261,14 +261,17 @@ void gm_free_client(gearman_client_st **client) {

/* free worker structure */
void gm_free_worker(gearman_worker_st **worker) {
if(worker == NULL)
gearman_worker_st *temp_worker = NULL;

if(worker == NULL) {
return;
if(*worker == NULL)
}
if(*worker == NULL) {
return;
gearman_worker_unregister_all(*worker);
gearman_worker_remove_servers(*worker);
gearman_worker_free(*worker);
}
temp_worker = *worker;
*worker = NULL;
gearman_worker_free(temp_worker);
}

/* get worker/jobs data from gearman server */
Expand Down
14 changes: 2 additions & 12 deletions neb_module_naemon/result_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ extern int result_threads_running;
extern int gm_should_terminate;

__thread EVP_CIPHER_CTX * result_ctx = NULL; /* make ssl context local in each thread */
__thread int is_processing_job = FALSE;

static const char *gearman_worker_source_name(void *source) {
if(!source)
Expand All @@ -61,9 +60,6 @@ static void cancel_worker_thread(void * data) {
if(data == NULL) {
return;
}
if(is_processing_job == TRUE) {
return;
}

gearman_worker_st **worker = (gearman_worker_st**) data;
gm_free_worker(worker);
Expand Down Expand Up @@ -106,9 +102,10 @@ void *result_worker( void * data ) {
gm_log( GM_LOG_ERROR, "worker error: %s\n", gearman_worker_error(worker));
if( gm_should_terminate == TRUE )
break;

gm_free_worker(&worker);
sleep(1);
if( gm_should_terminate == TRUE )
break;
set_worker(&worker);
break;
}
Expand All @@ -135,7 +132,6 @@ void *get_results( gearman_job_st *job, __attribute__((__unused__)) void *contex
char *ptr;
double now_f, core_starttime_f, starttime_f, finishtime_f, exec_time, latency;
size_t wsize = 0;
is_processing_job = TRUE;

/* for calculating real latency */
gettimeofday(&now,NULL);
Expand All @@ -151,7 +147,6 @@ void *get_results( gearman_job_st *job, __attribute__((__unused__)) void *contex
workload = (const char *)gearman_job_workload(job);
if(workload == NULL) {
*ret_ptr = GEARMAN_WORK_FAIL;
is_processing_job = FALSE;
return NULL;
}
gm_log( GM_LOG_TRACE, "got result %s\n", gearman_job_handle(job));
Expand Down Expand Up @@ -181,13 +176,11 @@ void *get_results( gearman_job_st *job, __attribute__((__unused__)) void *contex
total_submit_jobs,
total_submit_errors
);
is_processing_job = FALSE;
return((void*)result);
}

if(decrypted_data == NULL) {
*ret_ptr = GEARMAN_WORK_FAIL;
is_processing_job = FALSE;
return NULL;
}
gm_log( GM_LOG_TRACE, "%zu --->\n%s\n<---\n", strlen(decrypted_data), decrypted_data );
Expand All @@ -213,7 +206,6 @@ void *get_results( gearman_job_st *job, __attribute__((__unused__)) void *contex
if ( ( chk_result = ( check_result * )gm_malloc( sizeof *chk_result ) ) == 0 ) {
*ret_ptr = GEARMAN_WORK_FAIL;
gm_free(decrypted_data_c);
is_processing_job = FALSE;
return NULL;
}
init_check_result(chk_result);
Expand Down Expand Up @@ -283,7 +275,6 @@ void *get_results( gearman_job_st *job, __attribute__((__unused__)) void *contex
if ( chk_result->host_name == NULL || chk_result->output == NULL ) {
*ret_ptr= GEARMAN_WORK_FAIL;
gm_log( GM_LOG_ERROR, "discarded invalid job (%s), check your encryption settings\n", gearman_job_handle( job ) );
is_processing_job = FALSE;
return NULL;
}

Expand Down Expand Up @@ -342,7 +333,6 @@ void *get_results( gearman_job_st *job, __attribute__((__unused__)) void *contex

gm_free(decrypted_data_c);

is_processing_job = FALSE;
return NULL;
}

Expand Down

0 comments on commit bca73c1

Please sign in to comment.