Skip to content

Commit

Permalink
Merge pull request #7 from discosat/reserved-procedure-slots
Browse files Browse the repository at this point in the history
add support for reserved/pre-compiled procedures
  • Loading branch information
NValsted authored May 3, 2024
2 parents 8fcab91 + 0edb552 commit bc73664
Show file tree
Hide file tree
Showing 10 changed files with 240 additions and 114 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ proc run 1 # Run the procedure to log data points when the vehicle is within th

One can then imagine an extended scenario where there is e.g. a third node responsible for some actuator that must react based on the sensor node, which adds another layer of coordination to the system - all this can be orchestrated using the DSL!

### Utilizing reserved, pre-programmable procedure slots (Not yet fully implemented)
There is also (_planned_) support for pre-programmed procedures in reserved slots, which can be used to simplify the DSL code or take care of more complex operations. For example, one can write a function compiled into the application on node 1 that calculates the euclidean distance between points defined by (`lat`, `lon`) and (`target_lat`, `target_lon`) and stores the result in the `dist` parameter. For the sake of example, let's assumed this procedure is available in slot 0. Then the geo-fencing setup procedure can be simplified as follows:
### Utilizing reserved, pre-programmable procedure slots
There is also support for pre-programmed procedures in reserved slots, which can be used to simplify the DSL code or take care of more complex operations. For example, one can write a function compiled into the application on node 1 that calculates the euclidean distance between points defined by (`lat`, `lon`) and (`target_lat`, `target_lon`) and stores the result in the `dist` parameter. For the sake of example, let's assumed this procedure is available in slot 0. Then the geo-fencing setup procedure can be simplified as follows:
```bash
# procedure 1 (geo-fencing setup)
proc new
Expand Down
5 changes: 3 additions & 2 deletions include/csp_proc/proc_analyze.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ extern "C" {
#endif

#include <csp_proc/proc_types.h>
#include <csp_proc/proc_store.h>

typedef struct {
int is_tail_call;
Expand All @@ -29,7 +30,7 @@ typedef struct {
typedef struct proc_analysis_t proc_analysis_t;

struct proc_analysis_t {
proc_t * proc;
proc_union_t proc_union;
proc_analysis_t ** sub_analyses;
size_t sub_analysis_count;
uint8_t * procedure_slots;
Expand All @@ -50,7 +51,7 @@ typedef struct proc_analysis_config_t {

void free_proc_analysis(proc_analysis_t * analysis);

int proc_analyze(proc_t * proc, proc_analysis_t * analysis, proc_analysis_config_t * config);
int proc_analyze(proc_union_t proc_union, proc_analysis_t * analysis, proc_analysis_config_t * config);

#ifdef __cplusplus
}
Expand Down
24 changes: 22 additions & 2 deletions include/csp_proc/proc_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,26 @@ extern "C" {
#include <csp_proc/proc_types.h>

extern proc_mutex_t * proc_store_mutex;
typedef int (*compiled_proc_t)();

extern compiled_proc_t proc_reserved_slots_array[RESERVED_PROC_SLOTS];

typedef enum {
PROC_TYPE_NONE,
PROC_TYPE_DSL,
PROC_TYPE_COMPILED
} proc_type_t;

/**
* A union of procedures defined via the DSL and pre-compiled procedures.
*/
typedef struct {
proc_type_t type;
union {
proc_t * dsl_proc;
compiled_proc_t compiled_proc;
} proc;
} proc_union_t;

/**
* Delete/reinitialize a fresh procedure from the procedure storage.
Expand Down Expand Up @@ -44,9 +64,9 @@ int __attribute__((weak)) set_proc(proc_t * proc, uint8_t slot, int overwrite);
*
* @param slot The slot to get the procedure from
*
* @return The procedure at the specified slot, or NULL if the slot is empty
* @return The procedure at the specified slot (either a DSL procedure or a pre-compiled procedure, also known as reserved procedure)
*/
proc_t * __attribute__((weak)) get_proc(uint8_t slot);
proc_union_t __attribute__((weak)) get_proc(uint8_t slot);

/**
* Get the slots of the procedures in the procedure storage with an instruction count greater than 0.
Expand Down
16 changes: 10 additions & 6 deletions src/proc_analyze.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#include <csp_proc/proc_analyze.h>
#include <csp_proc/proc_store.h>
#include <csp_proc/proc_memory.h>

/**
Expand Down Expand Up @@ -80,9 +79,9 @@ int analyze_instruction(proc_t * proc, uint8_t instruction_index, proc_instructi
* @param analysis The proc_analysis_t to populate
* @param config The configuration for the analysis
*/
int proc_analyze(proc_t * proc, proc_analysis_t * analysis, proc_analysis_config_t * config) {
int proc_analyze(proc_union_t proc_union, proc_analysis_t * analysis, proc_analysis_config_t * config) {
printf("Analyzing procedure\n");
analysis->proc = proc;
analysis->proc_union = proc_union;
analysis->deallocation_mark = 0;

analysis->sub_analyses = NULL;
Expand All @@ -91,6 +90,11 @@ int proc_analyze(proc_t * proc, proc_analysis_t * analysis, proc_analysis_config
analysis->procedure_slots = NULL;
analysis->procedure_slot_count = 0;

if (proc_union.type != PROC_TYPE_DSL) {
return 0;
}
proc_t * proc = proc_union.proc.dsl_proc;

analysis->instruction_analyses = proc_calloc(proc->instruction_count, sizeof(proc_instruction_analysis_t));
if (analysis->instruction_analyses == NULL) {
printf("Error allocating memory for instruction_analyses\n");
Expand Down Expand Up @@ -125,8 +129,8 @@ int proc_analyze(proc_t * proc, proc_analysis_t * analysis, proc_analysis_config
return -1;
}

proc_t * sub_proc = get_proc(instruction->instruction.call.procedure_slot);
if (sub_proc == NULL) {
proc_union_t sub_proc_union = get_proc(instruction->instruction.call.procedure_slot);
if (sub_proc_union.type != PROC_TYPE_DSL && sub_proc_union.type != PROC_TYPE_COMPILED) {
printf("Error fetching sub-procedure from procedure store\n");
return -1;
}
Expand All @@ -146,7 +150,7 @@ int proc_analyze(proc_t * proc, proc_analysis_t * analysis, proc_analysis_config
// Mark the procedure as analyzed
config->analyzed_procs[instruction->instruction.call.procedure_slot] = 1;
config->analyses[instruction->instruction.call.procedure_slot] = sub_analysis;
proc_analyze(sub_proc, sub_analysis, config);
proc_analyze(sub_proc_union, sub_analysis, config);
}

analysis->sub_analyses[analysis->sub_analysis_count] = sub_analysis;
Expand Down
13 changes: 12 additions & 1 deletion src/proc_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,18 @@ static void proc_serve_del_request(csp_packet_t * packet) {

static void proc_serve_pull_request(csp_packet_t * packet) {
uint8_t slot = packet->data[1];
proc_t * procedure = get_proc(slot);
if (slot < RESERVED_PROC_SLOTS) {
printf("Reserved procedure requested\n");
packet->data[0] = PROC_PULL_RESPONSE;
packet->data[0] |= PROC_FLAG_END;
packet->data[0] |= PROC_FLAG_ERROR;
packet->length = 1;
csp_sendto_reply(packet, packet, CSP_O_SAME);
return;
}

proc_union_t proc_union = get_proc(slot);
proc_t * procedure = proc_union.proc.dsl_proc;
if (procedure == NULL) {
printf("Procedure not found\n");
packet->data[0] = PROC_PULL_RESPONSE;
Expand Down
78 changes: 52 additions & 26 deletions src/runtime/proc_runtime_FreeRTOS.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
int proc_instructions_exec(proc_t * proc, proc_analysis_t * analysis);

typedef struct {
proc_t * proc;
proc_union_t * proc_union;
TaskHandle_t task_handle;
} task_t;

Expand Down Expand Up @@ -53,7 +53,9 @@ int proc_stop_runtime_task(TaskHandle_t task_handle) {
for (size_t i = 0; i < running_tasks_count; i++) {
if (running_tasks[i].task_handle == task_handle) {
vTaskDelete(running_tasks[i].task_handle);
free_proc(running_tasks[i].proc);
if (running_tasks[i].proc_union->type == PROC_TYPE_DSL) {
free_proc(running_tasks[i].proc_union->proc.dsl_proc);
}
running_tasks[i] = running_tasks[running_tasks_count - 1];
running_tasks = proc_realloc(running_tasks, --running_tasks_count * sizeof(task_t));
break;
Expand All @@ -79,9 +81,7 @@ int proc_stop_all_runtime_tasks() {
return inf_loop_guard < 1000 ? 0 : -1;
}

void runtime_task(void * pvParameters) {
proc_t * proc = (proc_t *)pvParameters;

int dsl_proc_exec(proc_union_t proc_union) {
// Perform static analysis
proc_analysis_config_t analysis_config = {
.analyzed_procs = proc_calloc(MAX_PROC_SLOT + 1, sizeof(int)),
Expand All @@ -93,17 +93,38 @@ void runtime_task(void * pvParameters) {
return -1;
}

if (proc_analyze(proc, analysis, &analysis_config) != 0) {
if (proc_analyze(proc_union, analysis, &analysis_config) != 0) {
csp_print("Error analyzing procedure\n");
return -1;
}

int ret = proc_instructions_exec(proc, analysis); // TODO: set error flag param
int ret = proc_instructions_exec(proc_union.proc.dsl_proc, analysis);

// Procedure finished, clean up
free_proc_analysis(analysis);
free_proc(proc_union.proc.dsl_proc);

return ret;
}

void runtime_task(void * pvParameters) {
proc_union_t * proc_union = (proc_union_t *)pvParameters;

int ret;
switch (proc_union->type) {
case PROC_TYPE_DSL:
ret = dsl_proc_exec(*proc_union);
break;
case PROC_TYPE_COMPILED:
ret = proc_union->proc.compiled_proc();
break;
default:
ret = -1;
}
// TODO: set error flag param if ret != 0

// Procedure finished, clean up
if (xSemaphoreTake(running_tasks_mutex, portMAX_DELAY) != pdTRUE) {
free_proc_analysis(analysis);
free_proc(proc);
vTaskDelete(NULL);
return;
}
Expand All @@ -116,9 +137,8 @@ void runtime_task(void * pvParameters) {
}
}
xSemaphoreGive(running_tasks_mutex);
proc_free(proc_union);
csp_print("Procedure finished (%s)\n", pcTaskGetName(task_handle));
free_proc_analysis(analysis);
free_proc(proc);
vTaskDelete(NULL);
}

Expand All @@ -129,46 +149,52 @@ int proc_runtime_run(uint8_t proc_slot) {
return -1;
}

proc_t * stored_proc = get_proc(proc_slot);
proc_union_t * stored_proc = proc_malloc(sizeof(proc_union_t));
*stored_proc = get_proc(proc_slot);

if (stored_proc == NULL) {
if (stored_proc->type != PROC_TYPE_DSL && stored_proc->type != PROC_TYPE_COMPILED) {
csp_print("Procedure in slot %d not found\n", proc_slot);
return -1;
}
if (stored_proc->instruction_count == 0) {
csp_print("Procedure in slot %d has no instructions\n", proc_slot);
return -1;
}

// Copy procedure to detach from proc store
proc_t * detached_proc = proc_malloc(sizeof(proc_t));
if (deepcopy_proc(stored_proc, detached_proc) != 0) {
csp_print("Failed to copy procedure\n");
return -1;
if (stored_proc->type == PROC_TYPE_DSL) {
// Copy procedure to detach from proc store
proc_t * detached_proc = proc_malloc(sizeof(proc_t));
if (deepcopy_proc(stored_proc->proc.dsl_proc, detached_proc) != 0) {
csp_print("Failed to copy procedure\n");
return -1;
}
stored_proc->proc.dsl_proc = detached_proc;
}

// Create task
if (xSemaphoreTake(running_tasks_mutex, portMAX_DELAY) != pdTRUE) { // taking mutex early to prevent clean-up from the newly spawned task before it's added to the task array
free_proc(detached_proc);
if (stored_proc->type == PROC_TYPE_DSL) {
free_proc(stored_proc->proc.dsl_proc);
}
proc_free(stored_proc);
return -1;
}

TaskHandle_t task_handle;
char task_name[configMAX_TASK_NAME_LEN];
sprintf(task_name, "RNTM%d", proc_slot);
BaseType_t task_create_ret;
task_create_ret = xTaskCreate(runtime_task, task_name, PROC_RUNTIME_TASK_SIZE, detached_proc, PROC_RUNTIME_TASK_PRIORITY, &task_handle);
task_create_ret = xTaskCreate(runtime_task, task_name, PROC_RUNTIME_TASK_SIZE, stored_proc, PROC_RUNTIME_TASK_PRIORITY, &task_handle);

if (task_create_ret != pdPASS) {
csp_print("Failed to create task\n");
free_proc(detached_proc);
if (stored_proc->type == PROC_TYPE_DSL) {
free_proc(stored_proc->proc.dsl_proc);
}
proc_free(stored_proc);
xSemaphoreGive(running_tasks_mutex);
return -1;
}

// Add task to array
running_tasks = proc_realloc(running_tasks, ++running_tasks_count * sizeof(task_t));
running_tasks[running_tasks_count - 1] = (task_t){.proc = detached_proc, .task_handle = task_handle};
running_tasks[running_tasks_count - 1] = (task_t){.proc_union = stored_proc, .task_handle = task_handle};
xSemaphoreGive(running_tasks_mutex);

return 0;
Expand Down
Loading

0 comments on commit bc73664

Please sign in to comment.