diff --git a/fanout.c b/fanout.c index b18de4f5ae181e029adbb9bedaf50b1051105899..c52a17a0d1c031e2970910ab541a9892d38e3b3d 100644 --- a/fanout.c +++ b/fanout.c @@ -54,6 +54,7 @@ int channel_has_subscription (struct channel *c); struct channel *get_channel (const char *channel_name); void remove_channel (struct channel *c); void destroy_channel (struct channel *c); +u_int channel_count (); struct client *get_client (int fd); @@ -62,12 +63,14 @@ void shutdown_client (struct client *c); void destroy_client (struct client *c); void client_write (struct client *c, const char *data); void client_process_input_buffer (struct client *c); +u_int client_count (); struct subscription *get_subscription (struct client *c, struct channel *channel); void remove_subscription (struct subscription *s); void destroy_subscription (struct subscription *s); +u_int subscription_count (); void announce (const char *channel_name, const char *message); @@ -77,7 +80,7 @@ void unsubscribe (struct client *c, const char *channel_name); // GLOBAL VARS fd_set readset, tempset; -int max; +u_int max; static int daemonize = 0; FILE *logfile; @@ -114,6 +117,7 @@ int main (int argc, char *argv[]) {"logfile", 1, 0, 0}, {"pidfile", 1, 0, 0}, {"debug-level", 1, 0, 0}, + {"help", 0, 0, 0}, {NULL, 0, NULL, 0} }; @@ -144,6 +148,24 @@ int main (int argc, char *argv[]) case 4: debug_level = atoi (optarg); break; + case 5: + printf("Usage: fanout [options...]\n"); + printf("pubsub style fanout server\n\n"); + printf("Recognized options are:\n"); + printf(" --port=PORT port to run the service\ + on\n"); + printf(" --daemon fork to background\n"); + printf(" --logfile=PATH path to log file\n"); + printf(" --pidfile=PATH path to pid file\n"); + printf(" --debug-level=LEVEL verbosity level\n"); + printf(" 0 = ERROR (default)\n"); + printf(" 1 = WARNING\n"); + printf(" 2 = INFO\n"); + printf(" 3 = DEBUG\n"); + printf(" --help show this info and exit\ +\n"); + exit (EXIT_SUCCESS); + break; } break; default: @@ -467,6 +489,19 @@ void destroy_channel (struct channel *c) } +u_int channel_count () +{ + struct channel *channel_i = channel_head; + u_int count = 0; + + while (channel_i != NULL) { + count++; + channel_i = channel_i->next; + } + return count; +} + + struct client *get_client (int fd) { struct client *client_i = NULL; @@ -563,6 +598,33 @@ void client_process_input_buffer (struct client *c) client_write (c, message); free (message); message = NULL; + } else if ( ! strcmp (line, "stats")) { + //max connections + u_int max_connection_count = max; + if (daemonize) { + max_connection_count++; + } else { + max_connection_count -= 3; + } + + //current connections + u_int current_client_count = client_count (); + + //channels + u_int current_channel_count = channel_count (); + + //subscriptions + u_int current_subscription_count = subscription_count (); + //client + //messages + + asprintf (&message, "max connections: %d\ncurrent connections: %d\n\ +current channels: %d\ncurrent subscriptions: %d\n", max_connection_count, + current_client_count, current_channel_count, + current_subscription_count); + client_write (c, message); + free (message); + message = NULL; } else { action = strtok (line, " "); channel = strtok (NULL, " "); @@ -601,6 +663,19 @@ void client_process_input_buffer (struct client *c) } +u_int client_count () +{ + struct client *client_i = client_head; + u_int count = 0; + + while (client_i != NULL) { + count++; + client_i = client_i->next; + } + return count; +} + + struct subscription *get_subscription (struct client *c, struct channel *channel) { @@ -633,6 +708,19 @@ void destroy_subscription (struct subscription *s) } +u_int subscription_count () +{ + struct subscription *subscription_i = subscription_head; + u_int count = 0; + + while (subscription_i != NULL) { + count++; + subscription_i = subscription_i->next; + } + return count; +} + + void announce (const char *channel, const char *message) { fanout_debug (3, "attempting to announce message %s to channel %s\n",