diff --git a/include/data_proc_tracker.h b/include/data_proc_tracker.h index abef5071bfad79bd0c27a545a606e31a877c7f3f..ca212b28e6a33ee4df9dc0a032dbffb532ca1554 100644 --- a/include/data_proc_tracker.h +++ b/include/data_proc_tracker.h @@ -29,6 +29,8 @@ struct proc_tracker { unsigned long pt_track_get_id(struct proc_tracker *pt); +int pt_track_tasks_pending(struct proc_tracker *pt); + int pt_track_get_usage(struct proc_tracker *pt); int pt_track_level_critical(struct proc_tracker *pt); diff --git a/lib/data_proc_net.c b/lib/data_proc_net.c index ce84f59d5d63635ad2dcf1932dfda499c321e1f3..0273ccc55d6c9f6820019902d3dd9375e8d8850d 100644 --- a/lib/data_proc_net.c +++ b/lib/data_proc_net.c @@ -29,6 +29,43 @@ * is the responsibility of the user, who must ensure compatible or * interpretable data buffers. * + * + * + * + * + * Node procesing can be done by simply calling: + * + * pn_process_next() + * + * which does all the work and returns the number of tasks processed in the next + * pending node. + * + * + * This is equivalent to doing it manually: + * + * pt = pn_get_next_pending_tracker(pn) + * + * while (1) { + * t = pn_get_next_pending_task(pt) + * ret = pt->op(pt_get_pend_step_op_code(t), t); + * if (!pn_eval_task_status(pn, pt, t, ret)) + * pn_node_to_queue_tail(pn, pt); + * abort_processing: + * } + * } + * + * This may be used when the call to pt->op() needs special control. + * + * + * Input and output nodes are special, they must be processed explicitly by + * calling + * pn_process_inputs(pn); + * and + * pn_process_outputs(pn); + * + * This allows the operator of the processing network to control the I/O rate. + * + * */ #include <kernel/printk.h> @@ -83,6 +120,17 @@ static struct proc_tracker *pn_find_tracker(struct proc_net *pn, return NULL; } + +/** + * @brief propagate a task to its next tracker node + * + * @param pn a struct proc_net + * + * @param t a struct proc_task + * + * @return -1 on error, 0 otherwise + */ + static int pn_task_to_next_node(struct proc_net *pn, struct proc_task *t) { unsigned long op; @@ -122,107 +170,189 @@ static int pn_task_to_next_node(struct proc_net *pn, struct proc_task *t) } -int pn_process_next(struct proc_net *pn) +/** + * @brief move a tracker node to the end of the processing net queue + * + * @param pn a struct proc_net + * @param pt a struct proc_tracker + */ + +void pn_node_to_queue_tail(struct proc_net *pn, struct proc_tracker *pt) { - int ret; + list_move_tail(&pt->node, &pn->nodes); +} + + +/** + * @brief locate the first tracker that holds at least one task + * + * @param pn a struct proc_net + * + * @return a pointer to a struct proc_task or NULL if none was found + * + * @note empty trackers are moved to the end of the queue + * + * ideally, the tracker nodes would already be sorted so that the most critical + * tracker was the first item + * XXX implement that sometime... + */ + +struct proc_tracker *pn_get_next_pending_tracker(struct proc_net *pn) +{ + size_t cnt = 0; - struct proc_task *t = NULL; struct proc_tracker *pt; + struct proc_tracker *p_tmp; - unsigned long op; + if (list_empty(&pn->nodes)) + return NULL; - /* locate the first tracker that holds at least one task and process it - * ideally, this would be sorted so that the most critical tracker - * was the first item - * XXX implement that sometime... - */ - list_for_each_entry(pt, &pn->nodes, node) { + list_for_each_entry_safe(pt, p_tmp, &pn->nodes, node) { - t = pt_track_get(pt); + if (cnt++ > pn->n) + break; - if (t) + if (!pt_track_tasks_pending(pt)) + pn_node_to_queue_tail(pn, pt); + else break; } + return pt; +} - while (1) { - if (!t) - break; /* nothing to do */ - /* current step's op code */ - op = pt_get_pend_step_op_code(t); +/** + * @brief retrieve the next pending task in a tracker + * + * @param pt a struct proc_tracker + * + * @returns a struct proc_task or NULL if nothing pending + */ - /* XXX maybe eval return code, e.g. for signalling abort of - * current task node processing */ - ret = pt->op(op, t); - - switch (ret) { - case PN_TASK_SUCCESS: - /* move to next stage */ - pr_debug(MSG "task successful\n"); - pt_next_pend_step_done(t); - pn_task_to_next_node(pn, t); - break; +struct proc_task *pn_get_next_pending_task(struct proc_tracker *pt) +{ + return pt_track_get(pt); +} - case PN_TASK_STOP: - /* success, but abort processing node */ - pr_debug(MSG "task processing stop\n"); - pt_next_pend_step_done(t); - pn_task_to_next_node(pn, t); - goto loop_done; - case PN_TASK_DETACH: - pr_debug(MSG "task detached\n"); - /* task is now tracked by op function, do nothing */ - break; +/** + * @brief evaluate the return code of a task and signal how to proceed + * + * @param pn a struct proc_net + * @param pt a struct proc_tracker + * @param t a struct proc_task + * @þaram ret a tasks op function's return code + * + * @returns 1 if processing of the current tracker may continue or 0 to abort + */ - case PN_TASK_RESCHED: - /* move back to queue and abort */ - pr_debug(MSG "task rescheduled\n"); - pt_track_put(pt, t); - goto loop_done; - - case PN_TASK_SORTSEQ: - pr_debug(MSG "sort tasks\n"); - /* reschedule and sort tasks by seq counter */ - pt_track_put(pt, t); - pt_track_sort_seq(pt); - goto loop_done; - break; +int pn_eval_task_status(struct proc_net *pn, struct proc_tracker *pt, + struct proc_task *t, int ret) +{ + switch (ret) { + case PN_TASK_SUCCESS: + /* move to next stage */ + pr_debug(MSG "task successful\n"); + pt_next_pend_step_done(t); + pn_task_to_next_node(pn, t); + goto task_continue; + + case PN_TASK_STOP: + /* success, but abort processing node */ + pr_debug(MSG "task processing stop\n"); + pt_next_pend_step_done(t); + pn_task_to_next_node(pn, t); + goto task_abort; + + case PN_TASK_DETACH: + pr_debug(MSG "task detached\n"); + /* task is now tracked by op function, do nothing */ + goto task_continue; + + case PN_TASK_RESCHED: + /* move to back to queue and abort */ + pr_debug(MSG "task rescheduled\n"); + pt_track_put(pt, t); + goto task_abort; + + case PN_TASK_SORTSEQ: + pr_debug(MSG "sort tasks\n"); + /* reschedule and sort tasks by seq counter */ + pt_track_put(pt, t); + pt_track_sort_seq(pt); + goto task_abort; + + case PN_TASK_DESTROY: + pr_debug(MSG "destroy task\n"); + /* something is wrong, destroy this task */ + pt_destroy(t); + goto task_continue; + + default: + pr_err(MSG "Invalid retval %d, destroying task\n", ret); + pt_destroy(t); + break; + } - case PN_TASK_DESTROY: - pr_debug(MSG "destroy task\n"); - /* something is wrong, destroy this task */ - pt_destroy(t); - break; - default: - pr_err(MSG "Invalid retval %d, destroying task\n", ret); - pt_destroy(t); + +task_continue: + return 1; + +task_abort: + return 0; +} + + +/** + * @brief process tasks in the next tracker node that holds at least one task + * + * @param pn a struct proc_net + * + * @returns the number executed tasks for the tracker node + * + * @note this can be used to execute a processing cycle in a single call as + * opposed to doing it explicitly step by step + */ + +int pn_process_next(struct proc_net *pn) +{ + int ret; + int cnt = 0; + + struct proc_task *t = NULL; + struct proc_tracker *pt; + + + pt = pn_get_next_pending_tracker(pn); + if (!pt) + return cnt; + + while (1) { + t = pn_get_next_pending_task(pt); + if (!t) break; - } - /* next */ - t = pt_track_get(pt); + cnt++; + + ret = pt->op(pt_get_pend_step_op_code(t), t); + + if (!pn_eval_task_status(pn, pt, t, ret)) + break; } -loop_done: /* move processing task to end of queue */ /* XXX should insert that based on critical level/fill state */ - if (pt) - list_move_tail(&pt->node, &pn->nodes); - + pn_node_to_queue_tail(pn, pt); - return 0; + return cnt; } - - - /** * @brief add a task to the input of the network */ diff --git a/lib/data_proc_tracker.c b/lib/data_proc_tracker.c index b3eeba292e516ba2311da5d5bc86659815032b76..a27fceb7786d250a313e020508f51ff916ca766a 100644 --- a/lib/data_proc_tracker.c +++ b/lib/data_proc_tracker.c @@ -59,6 +59,23 @@ int pt_track_get_usage(struct proc_tracker *pt) } +/** + * @brief check for pending tasks in a tracker + * + * @param pt a struct proc_tracker + * + * @returns 1 if tasks pending, 0 otherwise + */ + +int pt_track_tasks_pending(struct proc_tracker *pt) +{ + if (pt && list_filled(&pt->tasks)) + return 1; + + return 0; +} + + /** * @brief add a new item to the processing tracker *