diff --git a/fanout.c b/fanout.c index 73e42930a2ebbfc5b8dd7869615c9ff8d892994f..151df5b4f2c1f0162fd52c7fc007e6e7321287b3 100644 --- a/fanout.c +++ b/fanout.c @@ -13,6 +13,8 @@ #include <pwd.h> #include <grp.h> #include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> #include <sys/stat.h> #include <netinet/in.h> #include <time.h> @@ -59,6 +61,7 @@ void str_swap_free (char **target, char *source); char *str_append (char *target, const char *data); void fanout_error(const char *msg); void fanout_debug (int level, const char *format, ...); +char *getsocketpeername(int fd); int channel_exists (const char *channel_name); int channel_has_subscription (struct channel *c); @@ -212,17 +215,19 @@ ice on\n"); printf(" 1986 (default)\n"); printf(" --run-as=USER[:GROUP] drop permissions to \ defined levels\n"); - printf(" --daemon fork to background\n"); + printf(" --daemon fork to background\n\ +"); printf(" --client-limit=LIMIT max connections\n"); printf(" BEWARE ulimit \ restrictions\n"); - printf(" you may adjust it using\ - ulimit -n X\n"); + printf(" you may adjust it us\ +ing ulimit -n X\n"); printf(" or sysctl -w \ fs.file-max=100000\n"); printf(" --logfile=PATH path to log file\n"); - printf(" --max-logfile-size=SIZE logfile size in MB\n"); + printf(" --max-logfile-size=SIZE logfile size in MB\n\ +"); printf(" --pidfile=PATH path to pid file\n"); printf(" --debug-level=LEVEL verbosity level\n"); printf(" \ @@ -230,8 +235,8 @@ fs.file-max=100000\n"); printf(" 1 = WARNING\n"); printf(" 2 = INFO\n"); printf(" 3 = DEBUG\n"); - printf(" --help show this info and exit\ -\n"); + printf(" --help show this info and e\ +xit\n"); exit (EXIT_SUCCESS); break; //client-limit @@ -463,8 +468,9 @@ fs.file-max=100000\n"); } clilen = sizeof (cli_addr); - if ((client_i->fd = accept (srvsock, (struct sockaddr *)&cli_addr, - &clilen)) == -1) { + if ((client_i->fd = accept (srvsock, + (struct sockaddr *)&cli_addr, + &clilen)) == -1) { fanout_debug (0, "%s\n", strerror (errno)); free (client_i); fanout_error ("failed on accept ()"); @@ -474,7 +480,8 @@ fs.file-max=100000\n"); int current_count = client_count (); if (client_limit > 0 && current_count >= client_limit) { - fanout_debug (1, "hit connection limit of: %d\n", client_limit); + fanout_debug (1, "hit connection limit of: %d\n", + client_limit); send (client_i->fd, "debug!busy\n", strlen ("debug!busy\n"), 0); close (client_i->fd); @@ -491,7 +498,8 @@ resetting counter\n"); //add new socket to watch list ev.events = EPOLLIN; ev.data.fd = client_i->fd; - if (epoll_ctl (epollfd, EPOLL_CTL_ADD, client_i->fd, &ev) == -1) { + if (epoll_ctl (epollfd, EPOLL_CTL_ADD, + client_i->fd, &ev) == -1) { fanout_error ("epoll_ctl: srvsock"); } @@ -511,14 +519,15 @@ resetting counter\n"); max_client_count = current_count; } - fanout_debug (2, "client socket connected\n"); + fanout_debug (2, "client socket %d connected from %s\n", + client_i->fd, getsocketpeername (client_i->fd)); client_write (client_i, "debug!connected...\n"); subscribe (client_i, "all"); //stats if (clients_count == ULLONG_MAX) { - fanout_debug (1, "wow, you've accepted alot of connections..\ - resetting counter\n"); + fanout_debug (1, "wow, you've accepted alot of connections.\ +.resetting counter\n"); clients_count = 0; } clients_count++; @@ -529,7 +538,8 @@ resetting counter\n"); while (client_i != NULL) { if (efd == client_i->fd) { // Process data from socket i - fanout_debug (3, "processing client %d\n", client_i->fd); + fanout_debug (3, "processing client %d\n", + client_i->fd); memset (buffer, 0, sizeof (buffer)); res = recv (client_i->fd, buffer, 1024, 0); if (res <= 0) { @@ -539,17 +549,19 @@ resetting counter\n"); client_i = client_i->next; //del socket from watch list - if (epoll_ctl (epollfd, EPOLL_CTL_DEL, client_tmp->fd, &ev) == -1) { + if (epoll_ctl (epollfd, EPOLL_CTL_DEL, + client_tmp->fd, &ev) == -1) { fanout_error ("epoll_ctl: srvsock"); } shutdown_client (client_tmp); continue; } else { // Process data in buffer - fanout_debug (3, "%d bytes read: [%.*s]\n", res, (res - 1), - buffer); + fanout_debug (3, "%d bytes read: [%.*s]\n", res, + (res - 1), buffer); client_i->input_buffer = str_append ( - client_i->input_buffer, buffer); + client_i->input_buffer, + buffer); client_process_input_buffer (client_i); } } @@ -680,6 +692,16 @@ void fanout_debug (int level, const char *format, ...) free (message); } +char *getsocketpeername (int fd) +{ + struct sockaddr_in m_addr; + socklen_t len; + + len = sizeof m_addr; + getpeername(fd, (struct sockaddr*)&m_addr, &len); + return inet_ntoa(m_addr.sin_addr); +} + int channel_exists (const char *channel_name) { @@ -777,7 +799,8 @@ struct client *get_client (int fd) void remove_client (struct client *c) { - fanout_debug (3, "removing client %d from service\n", c->fd); + fanout_debug (3, "removing client %d connected from %s from service\n", + c->fd, getsocketpeername (c->fd)); if (c->next != NULL) { c->next->previous = c->previous; }