diff --git a/include/data_proc_net.h b/include/data_proc_net.h index f7a5dabe62b6d94f0948b0057cbb833a18ed42c9..7b9314a976331a6f2ba4ddfe42de8d289111996d 100644 --- a/include/data_proc_net.h +++ b/include/data_proc_net.h @@ -16,17 +16,28 @@ #define PN_OP_NODE_OUT 0x00000000 +/* return codes for op functions */ +#define PN_TASK_SUCCESS 0 /* move to next stage */ +#define PN_TASK_STOP 1 /* success, but abort processing node */ +#define PN_TASK_DETACH 2 /* task is now tracked by op function */ +#define PN_TASK_RESCHED 3 /* move back to queue */ +#define PN_TASK_SORTSEQ 4 /* reschedule and sort tasks by seq counter */ +#define PN_TASK_DESTROY 5 /* something is wrong, destroy this task */ + + + struct proc_net; + int pt_track_execute_next(struct proc_tracker *pt); void pn_input_task(struct proc_net *pn, struct proc_task *t); int pn_process_next(struct proc_net *pn); int pn_process_inputs(struct proc_net *pn); +int pn_process_outputs(struct proc_net *pn); -int pn_create_output_node(struct proc_net *pn, - int (*op)(unsigned long op_code, struct proc_task *)); +int pn_create_output_node(struct proc_net *pn, op_func_t op); int pn_add_node(struct proc_net *pn, struct proc_tracker *pt); -struct proc_net *pn_create(size_t n_in_tasks_crit, size_t n_out_tasks_crit); +struct proc_net *pn_create(void); void pn_destroy(struct proc_net *pn); #endif /* _DATA_PROC_NET_H_ */ diff --git a/include/data_proc_tracker.h b/include/data_proc_tracker.h index e8ee2e9954a81ac8cb3abeb33b880680de7c15d4..abef5071bfad79bd0c27a545a606e31a877c7f3f 100644 --- a/include/data_proc_tracker.h +++ b/include/data_proc_tracker.h @@ -9,14 +9,18 @@ #include <list.h> #include <data_proc_task.h> + + +typedef int (*op_func_t)(unsigned long op_code, struct proc_task *); + struct proc_tracker { struct list_head tasks; size_t n_tasks; size_t n_tasks_crit; unsigned long op_code; - - int (*op)(unsigned long op_code, struct proc_task *); + + op_func_t op; struct list_head node; /* to be used for external tracking */ @@ -24,7 +28,7 @@ struct proc_tracker { unsigned long pt_track_get_id(struct proc_tracker *pt); - + int pt_track_get_usage(struct proc_tracker *pt); int pt_track_level_critical(struct proc_tracker *pt); @@ -37,9 +41,7 @@ struct proc_task *pt_track_get(struct proc_tracker *pt); void pt_track_sort_seq(struct proc_tracker *pt); -struct proc_tracker *pt_track_create(int (*op)(unsigned long op_code, - struct proc_task *), - unsigned long op_code, +struct proc_tracker *pt_track_create(op_func_t op, unsigned long op_code, size_t n_tasks_crit); void pt_track_destroy(struct proc_tracker *pt); diff --git a/lib/data_proc_net.c b/lib/data_proc_net.c index 543ce066df88579f3b5a44bd72b8ab07da839a13..ce84f59d5d63635ad2dcf1932dfda499c321e1f3 100644 --- a/lib/data_proc_net.c +++ b/lib/data_proc_net.c @@ -2,7 +2,32 @@ * @file lib/data_proc_net.c * * - * This is a data processing network + * This can be used to create data processing networks. + * + * Each node in the network is a data processing tracker with a particular op + * code. Tasks created with pt_create() are inserted into the network via the + * input node and exit the network via the output node. Each processing task + * is forwarded through the nodes as defined in its sequence of steps + * (via pt_add_step()), which form the processing chain the task must pass + * in order to be completed from the "chain link nodes" of the network. + * + * The steps are defined as "op codes" that must match an op code of a node in + * the network. If the network encounters an unknown op code, the task is + * destroyed, otherwise it is passed on to the next node until all processing is + * done and the processing task is moved to the output node. + * + * The op code processing function is handed the task and steps are taken + * depening on its return code. Op code processors may simply pass the processed + * task to the next stage, reschedule, or command to stop processing the current + * node etc. + * + * Since it may be necessary to collect and merge multiple tasks, an op code + * processor may also take over tracking of tasks and or modify their step list + * on the fly. + * + * The format of the data passed with each task and in between processing nodes + * is the responsibility of the user, who must ensure compatible or + * interpretable data buffers. * */ @@ -15,18 +40,24 @@ #include <data_proc_net.h> + +#define MSG "PN: " + + struct proc_net { struct proc_tracker *in; struct proc_tracker *out; struct list_head nodes; - + size_t n; -}; +}; -static int pn_dummy_op(unsigned long op_code, struct proc_task *pt) +static int pn_dummy_op(unsigned long op_code, struct proc_task *t) { + pt_destroy(t); + return 0; } @@ -34,7 +65,7 @@ static int pn_dummy_op(unsigned long op_code, struct proc_task *pt) * @brief locate a tracker by op code * * @returns tracker or NULL if not found - * + * * @note this is not very efficient, especially for large numbers of nodes... */ @@ -52,19 +83,55 @@ static struct proc_tracker *pn_find_tracker(struct proc_net *pn, return NULL; } +static int pn_task_to_next_node(struct proc_net *pn, struct proc_task *t) +{ + unsigned long op; + + static struct proc_tracker *pt_out; + + + if (!pt_out) + pt_out = list_entry(pn->nodes.next, struct proc_tracker, node); + + + /* next steps's op code */ + op = pt_get_pend_step_op_code(t); + + if (!op) { + pt_track_put(pn->out, t); + return 0; + } + + if (pt_out->op_code != op) { + pt_out = pn_find_tracker(pn, op); + + /* this should not happen */ + if (!pt_out) { + pr_crit("Error, no such op code, destroying task\n"); + + pt_destroy(t); + + return -1; + } + } + + /* move to next matching node */ + pt_track_put(pt_out, t); + + return 0; +} int pn_process_next(struct proc_net *pn) { + int ret; + struct proc_task *t = NULL; struct proc_tracker *pt; - static struct proc_tracker *pt_out; unsigned long op; - - if (!pt_out) - pt = list_entry(pn->nodes.next, struct proc_tracker, node); + /* locate the first tracker that holds at least one task and process it * ideally, this would be sorted so that the most critical tracker @@ -80,7 +147,7 @@ int pn_process_next(struct proc_net *pn) break; } - + while (1) { if (!t) break; /* nothing to do */ @@ -90,41 +157,59 @@ int pn_process_next(struct proc_net *pn) /* XXX maybe eval return code, e.g. for signalling abort of * current task node processing */ - pt->op(op, t); - + ret = pt->op(op, t); + + switch (ret) { + case PN_TASK_SUCCESS: + /* move to next stage */ + pr_debug(MSG "task successful\n"); + pt_next_pend_step_done(t); + pn_task_to_next_node(pn, t); + break; - pt_next_pend_step_done(t); + case PN_TASK_STOP: + /* success, but abort processing node */ + pr_debug(MSG "task processing stop\n"); + pt_next_pend_step_done(t); + pn_task_to_next_node(pn, t); + goto loop_done; - /* next steps's op code */ - op = pt_get_pend_step_op_code(t); - - if (!op) { - pt_track_put(pn->out, t); - } else if (pt_out->op_code != op) { - pt_out = pn_find_tracker(pn, op); + case PN_TASK_DETACH: + pr_debug(MSG "task detached\n"); + /* task is now tracked by op function, do nothing */ + break; - /* this should not happen */ - if (!pt_out) { - pr_crit("Error, no such op code, " - "destroying task\n"); + case PN_TASK_RESCHED: + /* move back to queue and abort */ + pr_debug(MSG "task rescheduled\n"); + pt_track_put(pt, t); + goto loop_done; + + case PN_TASK_SORTSEQ: + pr_debug(MSG "sort tasks\n"); + /* reschedule and sort tasks by seq counter */ + pt_track_put(pt, t); + pt_track_sort_seq(pt); + goto loop_done; + break; - pt_destroy(t); - - /* reset */ - pt_out = list_entry(pn->nodes.next, - struct proc_tracker, node); - - t = pt_track_get(pt); - continue; - } + case PN_TASK_DESTROY: + pr_debug(MSG "destroy task\n"); + /* something is wrong, destroy this task */ + pt_destroy(t); + break; + default: + pr_err(MSG "Invalid retval %d, destroying task\n", ret); + pt_destroy(t); + break; } - /* move to next matching node */ - pt_track_put(pt_out, t); - + /* next */ t = pt_track_get(pt); } +loop_done: + /* move processing task to end of queue */ /* XXX should insert that based on critical level/fill state */ if (pt) @@ -146,7 +231,7 @@ void pn_input_task(struct proc_net *pn, struct proc_task *t) { if (!pn) return; - + if (!t) return; @@ -168,12 +253,13 @@ int pn_process_inputs(struct proc_net *pn) static struct proc_tracker *pt; + if (list_empty(&pn->nodes)) return -1; - + if (!pt) pt = list_entry(pn->nodes.next, struct proc_tracker, node); - + while (1) { t = pt_track_get(pn->in); @@ -189,11 +275,11 @@ int pn_process_inputs(struct proc_net *pn) "destroying task\n"); pt_destroy(t); - + /* reset */ pt = list_entry(pn->nodes.next, struct proc_tracker, node); - + t = pt_track_get(pt); continue; } @@ -206,7 +292,40 @@ int pn_process_inputs(struct proc_net *pn) return 0; } - +/** + * @brief process tasks in the output node + * + * @returns number of output tasks processed + * + * @note the dummy output op runs pt_destroy() on tasks, which will leave + * any data buffers untouched; user-defined output op functions need + * to do their own cleanup routine + * + */ + +int pn_process_outputs(struct proc_net *pn) +{ + int n = 0; + + struct proc_task *t; + + + while (1) { + t = pt_track_get(pn->out); + + if (!t) + break; + + /* XXX maybe eval return code, e.g. for signalling abort of + * current task node processing */ + pn->out->op(PN_OP_NODE_OUT, t); + n++; + } + + return n; +} + + /** * @brief create an output node of the network * @@ -216,21 +335,22 @@ int pn_process_inputs(struct proc_net *pn) * original node is left intact */ -int pn_create_output_node(struct proc_net *pn, - int (*op)(unsigned long op_code, struct proc_task *)) +int pn_create_output_node(struct proc_net *pn, op_func_t op) { struct proc_tracker *pt; pt = pt_track_create(op, PN_OP_NODE_OUT, 1); - + if (!pt) return -ENOMEM; if (pn->out) pt_track_destroy(pn->out); + pn->out = pt; + return 0; } @@ -252,7 +372,7 @@ int pn_add_node(struct proc_net *pn, struct proc_tracker *pt) if (pt->op_code == PN_OP_NODE_IN) return -EINVAL; - + if (pt->op_code == PN_OP_NODE_OUT) return -EINVAL; @@ -273,27 +393,23 @@ int pn_add_node(struct proc_net *pn, struct proc_tracker *pt) * * @return processing network or NULL on error * - * @note this creates a default output node that does nothing and just - * accumulates tasks + * @note this creates a default output node that does nothing but run + * pt_destroy() on tasks */ -struct proc_net *pn_create(size_t n_in_tasks_crit, size_t n_out_tasks_crit) +struct proc_net *pn_create(void) { struct proc_net *pn; - if (!n_in_tasks_crit) - return NULL; - - if (!n_out_tasks_crit) - return NULL; - pn = (struct proc_net *) kzalloc(sizeof(struct proc_net)); if (!pn) goto error; - + /* critical levels are set to 1, input and output nodes are only run + * explicitly anyways + */ /* the input node just accepts tasks and distributes them to the * appropriate trackers in the network */ @@ -302,7 +418,7 @@ struct proc_net *pn_create(size_t n_in_tasks_crit, size_t n_out_tasks_crit) if (!pn->in) goto cleanup; - /* create a default output node that does nothing */ + /* create a default output node that does nothing but pt_destroy() */ pn->out = pt_track_create(pn_dummy_op, PN_OP_NODE_OUT, 1); if (!pn->out) goto cleanup; @@ -325,13 +441,13 @@ error: /** - * @brief destroy a processing network + * @brief destroy a processing network * * @param pn a struct proc_net * * * @note the data pointers in the processing tasks are untouched, - * see also pt_destroy() + * see also pt_destroy() */ void pn_destroy(struct proc_net *pn) @@ -348,7 +464,7 @@ void pn_destroy(struct proc_net *pn) pt_track_destroy(p_elem); } - + pt_track_destroy(pn->in); pt_track_destroy(pn->out); diff --git a/lib/data_proc_task.c b/lib/data_proc_task.c index 6f51742e2d1678a4919a972f4e954cce94d185f7..7c1137a7dbd29f4ee9b932283572bf90175048b9 100644 --- a/lib/data_proc_task.c +++ b/lib/data_proc_task.c @@ -375,7 +375,7 @@ void pt_set_data(struct proc_task *t, void *data, size_t nmemb) * @brief set the type id of a processing task * * @param t a struct proc_task - * @param type the type id to set + * @param type the type id to set * */ @@ -408,7 +408,7 @@ void pt_set_seq(struct proc_task *t, unsigned long seq) * * @param an arbitrary type identifier * @param an arbitrary sequence number - * + * * @return a pointer to the newly created task or NULL on error */ diff --git a/lib/data_proc_tracker.c b/lib/data_proc_tracker.c index 708ba05cbd7097912a7b9ed8f9dd0c0f27f8fc6e..b3eeba292e516ba2311da5d5bc86659815032b76 100644 --- a/lib/data_proc_tracker.c +++ b/lib/data_proc_tracker.c @@ -32,7 +32,7 @@ unsigned long pt_track_get_op_code(struct proc_tracker *pt) /** - * @brief check if the tracker is above its critical number of tasks + * @brief check if the tracker is above its critical number of tasks * * @param pt a struct proc_tracker * @@ -50,7 +50,7 @@ int pt_track_level_critical(struct proc_tracker *pt) * * @param pt a struct proc_tracker * - * return number of tasks tracked + * return number of tasks tracked */ int pt_track_get_usage(struct proc_tracker *pt) @@ -79,7 +79,7 @@ int pt_track_put(struct proc_tracker *pt, struct proc_task *t) if (!pt) return -EINVAL; - + if (!t) return -EINVAL; @@ -111,7 +111,7 @@ int pt_track_put_force(struct proc_tracker *pt, struct proc_task *t) { if (!pt) return -EINVAL; - + if (!t) return -EINVAL; @@ -139,11 +139,11 @@ struct proc_task *pt_track_get(struct proc_tracker *pt) if (list_empty(&pt->tasks)) return NULL; - + t = list_entry(pt->tasks.next, struct proc_task, node); - + list_del(&t->node); - + pt->n_tasks--; return t; @@ -185,7 +185,7 @@ int pt_track_execute_next(struct proc_tracker *pt) void pt_track_sort_seq(struct proc_tracker *pt) { - printk("TODO: list_sort\n"); + printk("TODO: list_sort not implemented\n"); } @@ -202,9 +202,8 @@ void pt_track_sort_seq(struct proc_tracker *pt) * @return processing tracker or NULL on error */ -struct proc_tracker *pt_track_create(int (*op)(unsigned long op_code, - struct proc_task *), - unsigned long op_code, size_t n_tasks_crit) +struct proc_tracker *pt_track_create(op_func_t op, unsigned long op_code, + size_t n_tasks_crit) { struct proc_tracker *pt; @@ -224,7 +223,7 @@ struct proc_tracker *pt_track_create(int (*op)(unsigned long op_code, pt->op = op; pt->n_tasks_crit = n_tasks_crit; - + pt->op_code = op_code; INIT_LIST_HEAD(&pt->tasks); diff --git a/samples/proc_chain/proc_chain_demo.c b/samples/proc_chain/proc_chain_demo.c index e25ddb5578276ca19eeb76fe4f516dfcb0109bce..cdad4a06bfea0535b21a60a5b8f52174a779f4eb 100644 --- a/samples/proc_chain/proc_chain_demo.c +++ b/samples/proc_chain/proc_chain_demo.c @@ -20,12 +20,22 @@ #include <kernel/kernel.h> +#define CRIT_LEVEL 10 + +#define OP_ADD 0x1234 +#define OP_SUB 0x1235 +#define OP_MUL 0x1236 + +#define STEPS 3 + + + void *kzalloc(size_t size); void kfree(void *ptr); int printk(const char *fmt, ...); int pn_prepare_nodes(struct proc_net *pn); -void pn_new_input_task(struct proc_net *pn); +void pn_new_input_task(struct proc_net *pn, size_t n); int op_add(unsigned long op_code, struct proc_task *pt); int op_sub(unsigned long op_code, struct proc_task *pt); @@ -55,220 +65,128 @@ void kfree(void *ptr) free(ptr); } +int op_output(unsigned long op_code, struct proc_task *t) +{ + ssize_t i; + ssize_t n; + + unsigned int *p = NULL; + n = pt_get_nmemb(t); + printk("OUT: op code %d, %d items\n", op_code, n); -/* special op codes */ -#define OP_INPUT 0xFFFFFFFF -#define OP_OUTPUT 0x00000000 -/* a random base for op codes for this demo */ -#define OP_BASE 0x10000000 + if (!n) + goto exit; -#define TASKS 5 -#define NODES (STEPS + 2) /* one per steps/op code + input + output */ -#define NODE_IN STEPS -#define NODE_OUT (STEPS + 1) + p = (unsigned int *) pt_get_data(t); + if (!p) + goto exit; + for (i = 0; i < n; i++) { + printk("\t%d\n", p[i]); + } + +exit: + kfree(p); /* clean up our data buffer */ + pt_destroy(t); + return PN_TASK_SUCCESS; +} -#if 0 -void proc_tasks_prepare(void) +int op_add(unsigned long op_code, struct proc_task *t) { - int i; - int j; - int go; - int op; + ssize_t i; + ssize_t n; - struct proc_task *pt; - struct proc_tracker **ptt_nodes; + unsigned int *p; + n = pt_get_nmemb(t); + if (!n) + return PN_TASK_SUCCESS; - /* allocate references to the intermediate nodes */ - ptt_nodes = (struct proc_tracker **) - kzalloc(NODES * sizeof(struct proc_tracker *)); - BUG_ON(!ptt_nodes); - - /* create the intermediate tracker nodes, their ID is an op code*/ - for (i = 0; i < STEPS; i++) { - ptt_nodes[i] = pt_track_create(OP_BASE + i); - BUG_ON(!ptt_nodes[i]); + p = (unsigned int *) pt_get_data(t); - } + if (!p) /* we have elements but data is NULL, error*/ + return PN_TASK_DESTROY; + + printk("ADD: op code %d, %d items\n", op_code, n); - /* create the input and output nodes */ - - ptt_nodes[NODE_IN] = pt_track_create(OP_INPUT); - BUG_ON(!ptt_nodes[NODE_IN]); - ptt_nodes[NODE_OUT] = pt_track_create(OP_OUTPUT); - BUG_ON(!ptt_nodes[NODE_OUT]); - - - /* create a number of individual tasks and define some processing steps, - * then add the to the input stage - */ - - for (i = 0; i < TASKS; i++) { - /* create a task holding at most STEPS steps */ - pt = pt_create(NULL, 0, STEPS, 0, i); - BUG_ON(!pt); - - /* activate all steps with different op-codes */ - for (j = 0; j < STEPS; j++) - BUG_ON(pt_add_step(pt, OP_BASE + j, NULL)); - - /* add the task to the input tracker */ - pt_track_put(ptt_nodes[NODE_IN], pt); + for (i = 0; i < n; i++) { + p[i] += 10; } - /* now process the tasks, we "schedule" by looping over all the - * trackers until none have pending tasks left - */ - - go = 1; - while (go) { - go = 0; - for (i = 0; i < NODES; i++) { - while (1) { /* keep processing while the node holds - tasks */ - pt = pt_track_get(ptt_nodes[i]); - if (!pt) - break; - - printk("Processing task in node %d\n", i); - go++; /* our indicator */ - - switch (i) { - case NODE_IN: - /* the input node, just moves the task - * into the first processing node based - * on the first processing step op code - */ - op = pt_get_pend_step_op_code(pt); - printk("IN: move task to node %d\n", - op - OP_BASE); - /* in our demo, we get the correct node - * by calculating it from the OP_BASE - */ - pt_track_put(ptt_nodes[op - OP_BASE], pt); - break; - - case NODE_OUT: /* output node */ - printk("OUT: destroy task\n"); - pt_destroy(pt); - break; - - default: /* processing node */ - op = pt_get_pend_step_op_code(pt); - printk("PROC: simulating processing " - "operation %d\n", op - OP_BASE); - /* this step is now complete */ - pt_next_pend_step_done(pt); - - op = pt_get_pend_step_op_code(pt); - if (op) { - printk("PROC: move task to node" - " %d\n", - op - OP_BASE); - pt_track_put(ptt_nodes[op - OP_BASE], pt); - } else { - printk("PROC: last step, moving" - " to output node"); - pt_track_put(ptt_nodes[NODE_OUT], pt); - } - - break; - } - } - - } - printk("Scheduling cycle complete\n"); - } - - printk("Processing complete\n"); + return PN_TASK_SUCCESS; +} +int op_sub(unsigned long op_code, struct proc_task *t) +{ + ssize_t i; + ssize_t n; + unsigned int *p; -#if 0 - pt_dump_steps_todo(pt); - pt_dump_steps_done(pt); + n = pt_get_nmemb(t); - pt_next_pend_step_done(pt); - pt_next_pend_step_done(pt); - pt_dump_steps_todo(pt); - pt_dump_steps_done(pt); + if (!n) + return PN_TASK_SUCCESS; - pt_next_pend_step_done(pt); - pt_dump_steps_todo(pt); - pt_dump_steps_done(pt); + p = (unsigned int *) pt_get_data(t); - pt_rewind_steps_done(pt); - - pt_dump_steps_todo(pt); -#endif + if (!p) /* we have elements but data is NULL, error*/ + return PN_TASK_DESTROY; + printk("SUB: op code %d, %d items\n", op_code, n); + for (i = 0; i < n; i++) { + p[i] -= 2; + } - for (i = 0; i < NODES; i++) - pt_track_destroy(ptt_nodes[i]); - - kfree(ptt_nodes); + return PN_TASK_SUCCESS; } -#endif +int op_mul(unsigned long op_code, struct proc_task *t) +{ -#define CRIT_LEVEL 10 -#define CRIT_IN CRIT_LEVEL -#define CRIT_OUT CRIT_LEVEL + ssize_t i; + ssize_t n; -#define OP_ADD 0x1234 -#define OP_SUB 0x1235 -#define OP_MUL 0x1236 + unsigned int *p; -#define STEPS 3 + n = pt_get_nmemb(t); + if (!n) + return PN_TASK_SUCCESS; -int op_output(unsigned long op_code, struct proc_task *pt) -{ - printk("OUT: op code %d\n", op_code); - - return 0; -} + p = (unsigned int *) pt_get_data(t); -int op_add(unsigned long op_code, struct proc_task *pt) -{ - printk("ADD: op code %d\n", op_code); - - return 0; -} + if (!p) /* we have elements but data is NULL, error*/ + return PN_TASK_DESTROY; -int op_sub(unsigned long op_code, struct proc_task *pt) -{ - printk("SUB: op code %d\n", op_code); - - return 0; -} + printk("MUL: op code %d, %d items\n", op_code, n); -int op_mul(unsigned long op_code, struct proc_task *pt) -{ - printk("MUL: op code %d\n", op_code); + for (i = 0; i < n; i++) { + p[i] *= 3; + } - return 0; + + return PN_TASK_SUCCESS; } @@ -276,7 +194,7 @@ int pn_prepare_nodes(struct proc_net *pn) { struct proc_tracker *pt; - + /* create and add processing node trackers for the each operation */ pt = pt_track_create(op_add, OP_ADD, CRIT_LEVEL); @@ -289,7 +207,6 @@ int pn_prepare_nodes(struct proc_net *pn) pt = pt_track_create(op_mul, OP_MUL, CRIT_LEVEL); BUG_ON(!pt); - BUG_ON(pn_add_node(pn, pt)); BUG_ON(pn_create_output_node(pn, op_output)); @@ -299,12 +216,15 @@ int pn_prepare_nodes(struct proc_net *pn) -void pn_new_input_task(struct proc_net *pn) +void pn_new_input_task(struct proc_net *pn, size_t n) { struct proc_task *t; static int seq; + int i; + unsigned int *data; + t = pt_create(NULL, 0, STEPS, 0, seq++); @@ -315,23 +235,13 @@ void pn_new_input_task(struct proc_net *pn) BUG_ON(pt_add_step(t, OP_MUL, NULL)); - pn_input_task(pn, t); -} -void pn_new_input_task2(struct proc_net *pn) -{ - struct proc_task *t; - - static int seq; + data = kzalloc(sizeof(unsigned int) * n); + for (i = 0; i < n; i++) + data[i] = i; - t = pt_create(NULL, 0, STEPS, 0, seq++); - - BUG_ON(!t); - - BUG_ON(pt_add_step(t, OP_MUL, NULL)); - BUG_ON(pt_add_step(t, OP_SUB, NULL)); - BUG_ON(pt_add_step(t, OP_ADD, NULL)); + pt_set_data(t, data, n); pn_input_task(pn, t); @@ -340,26 +250,28 @@ void pn_new_input_task2(struct proc_net *pn) - int main(int argc, char **argv) { struct proc_net *pn; - pn = pn_create(CRIT_IN, CRIT_OUT); + pn = pn_create(); BUG_ON(!pn); pn_prepare_nodes(pn); - pn_new_input_task(pn); - pn_new_input_task2(pn); + pn_new_input_task(pn, 5); + pn_new_input_task(pn, 0); + pn_new_input_task(pn, 3); + pn_process_inputs(pn); - pn_process_inputs(pn); - + pn_process_next(pn); + pn_process_next(pn); + pn_process_next(pn); pn_process_next(pn); pn_process_next(pn); pn_process_next(pn); @@ -367,6 +279,7 @@ int main(int argc, char **argv) pn_process_next(pn); pn_process_next(pn); + pn_process_outputs(pn); return 0;