diff --git a/src/server/net.c b/src/server/net.c index f234ffb1e4a28dac1d375c6f14b78550759106fd..1de3a9c10a0c5040ff96515c0cd2ed983146a0fc 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -30,7 +30,10 @@ #include <gio/gio.h> #include <glib.h> -#define SERVER_MAX_TX_TH_PER_CON 10 +/* this number must be large enough to handle multiple subsequent submissions + * of packets for a connection + */ +#define SERVER_CON_POOL_SIZE 16 /* client connection data */ @@ -42,6 +45,7 @@ struct con_data { gboolean new; gboolean kick; GMutex lock; + GCancellable *ca; GThreadPool *pool; }; @@ -54,7 +58,6 @@ struct thread { /* tracks client connections */ static GList *con_list; -static GMutex netlock; /** @@ -69,6 +72,27 @@ static gsize get_pkt_size_peek(struct packet *pkt) } +/** + * @brief returns the host address as string + */ + +static gchar *net_get_host_string(GSocketConnection *con) +{ + GSocketAddress *addr; + GInetAddress *iaddr; + + + addr = g_socket_connection_get_remote_address(con, NULL); + iaddr = g_inet_socket_address_get_address(G_INET_SOCKET_ADDRESS(addr)); + + return g_inet_address_to_string(iaddr); +} + + +/** + * @brief distribute a list of users to all clients + */ + static gboolean net_push_userlist_cb(gpointer data) { GList *elem; @@ -114,7 +138,7 @@ static gboolean net_push_userlist_cb(gpointer data) if (msg) { - ack_userlist(PKT_TRANS_ID_UNDEF, msg, strlen(msg)); + ack_userlist(PKT_TRANS_ID_UNDEF, (guchar *) msg, strlen(msg)); g_free(msg); } @@ -123,16 +147,83 @@ static gboolean net_push_userlist_cb(gpointer data) /** - * @brief drop a connection + * @brief if connected, disconnect the socket + */ + +static void try_disconnect_socket(struct con_data *c) +{ + gboolean ret = TRUE; + + GSocket *s; + GError *error = NULL; + + + if (!G_IS_SOCKET_CONNECTION(c->con)) + return; + + if (!g_socket_connection_is_connected(c->con)) + return; + + s = g_socket_connection_get_socket(c->con); + + if (g_socket_is_connected(s)) + ret = g_socket_close(s, &error); + + + if (!ret) { + if (error) { + g_warning("%s:%d %s", __func__, __LINE__, error->message); + g_clear_error(&error); + } + } +} + + +/** + * @brief initiate a connection drop */ -static void drop_connection(struct con_data *c) +static void drop_con_begin(struct con_data *c) +{ + + gchar *str; + + + str = net_get_host_string(c->con); + g_message("Initiating disconnect for %s (%s)", str, c->nick); + g_free(str); + + con_list = g_list_remove(con_list, c); + + /* signal operations to stop */ + g_cancellable_cancel(c->ca); + + g_thread_pool_free(c->pool, TRUE, FALSE); + c->pool = NULL; + + /* drop initial reference */ + g_object_unref(c->con); + + try_disconnect_socket(c); +} + + +/** + * @brief finalize a connection drop + */ + +static void drop_con_finalize(struct con_data *c) { gchar *buf; + + if (G_IS_OBJECT(c->con)) + return; + if (c->kick) { buf = g_strdup_printf("I kicked <tt><span foreground='#F1C40F'>" - "%s</span></tt> for being a lazy bum", + "%s</span></tt> for being a lazy bum " + "(client input saturated or timed out)", c->nick); } else { buf = g_strdup_printf("<tt><span foreground='#F1C40F'>" @@ -141,192 +232,123 @@ static void drop_connection(struct con_data *c) } net_server_broadcast_message(buf, NULL); + net_push_userlist_cb(NULL); - g_free(buf); g_free(c->nick); - - g_object_unref(c->con); - - con_list = g_list_remove(con_list, c); - - g_thread_pool_free(c->pool, TRUE, FALSE); - - /* BUG */ + g_free(buf); g_free(c); - - net_push_userlist_cb(NULL); } -static void do_send(gpointer data, gpointer user_data) -{ - struct con_data *c = (struct con_data *) user_data; - struct thread *th = (struct thread*) data; - GIOStream *stream; - GOutputStream *ostream; - GSocket *socket; +/** + * @brief send data on a connection + */ - GError *error = NULL; +static void do_send(gpointer data, gpointer user_data) +{ gboolean ret; + GOutputStream *os; - g_mutex_lock(&c->lock); - - if (!G_IS_IO_STREAM(c->con)) { - c->kick = TRUE; - goto bye; - } - - socket = g_socket_connection_get_socket(c->con); - if (!G_IS_SOCKET(socket)) { - c->kick = TRUE; - goto bye; - } - + struct thread *th; + struct con_data *c; - g_socket_set_timeout(socket, 10); - stream = G_IO_STREAM(c->con); - if (!G_IS_IO_STREAM(stream)) { - c->kick = TRUE; - goto bye; - } + th = (struct thread *) data; + c = (struct con_data *) user_data; - ostream = g_io_stream_get_output_stream(stream); - if (!G_IS_OUTPUT_STREAM(ostream)) { - c->kick = TRUE; - goto bye; - } - - ret = g_output_stream_write_all(ostream, th->buf, th->bytes, NULL, NULL, &error); + if (!G_IS_IO_STREAM(c->con)) + goto exit; - if (!G_IS_SOCKET(socket)) { - c->kick = TRUE; - goto bye; - } + g_object_ref(c->con); + os = g_io_stream_get_output_stream(G_IO_STREAM(c->con)); + g_mutex_lock(&c->lock); - if (!ret) { - if (error) { - g_warning("%s", error->message); - g_clear_error (&error); - } + ret = g_output_stream_write_all(os, th->buf, th->bytes, NULL, c->ca, + NULL); + if (!ret) c->kick = TRUE; - } -bye: - g_socket_set_timeout(socket, 0); g_mutex_unlock(&c->lock); + g_object_unref(c->con); + +exit: + /* if this was the last reference, call finalize */ + if (!G_IS_OBJECT(c->con)) + drop_con_finalize(c); g_free(th->buf); g_free(th); - } - /** * @brief send a packet on a connection + * + * @returns 0 on success, otherwise error */ -static gint net_send_internal(struct con_data *c, const char *pkt, gsize nbytes) +static gboolean net_send_internal(struct con_data *c, const char *pkt, gsize nbytes) { - gssize ret = 0; + gboolean ret; + struct thread *th; GError *error = NULL; - GIOStream *stream; - GOutputStream *ostream; + if (!c->pool) + return FALSE; - stream = G_IO_STREAM(c->con); + if (c->kick) + return FALSE; - if (!G_IS_IO_STREAM(stream)) - return -1; - if (c->kick) { - GSocket *socket; - - socket = g_socket_connection_get_socket(c->con); + if (g_thread_pool_get_num_threads(c->pool) >= SERVER_CON_POOL_SIZE) { - if (!g_socket_is_closed(socket)) { - ret = g_socket_close(socket, &error); - g_warning("Closed socket on timed out client"); + gchar *str; - if (!ret) { - if (error) { - g_warning("%s", error->message); - g_clear_error(&error); - } - } - } + str = net_get_host_string(c->con); - return -1; - - } - - - ostream = g_io_stream_get_output_stream(stream); - - - if (g_io_stream_is_closed(stream)) { - g_message("Error sending packet: stream closed\n"); - return -1; - } - - - if (!g_socket_connection_is_connected(c->con)) { - g_message("Error sending packet: socket not connected\n"); - return -1; - } - -#if 0 - /* set a 1 second socket timeout to detect broken connections */ - g_socket_set_timeout(g_socket_connection_get_socket(c->con), 1); - ret = g_output_stream_write_all(ostream, pkt, nbytes, NULL, NULL, &error); - /* set back to infinite */ - g_socket_set_timeout(g_socket_connection_get_socket(c->con), 0); - - if (!ret) { - if (error) { - g_warning("%s", error->message); - g_clear_error (&error); - } + g_message("Will kick client %s connected from %s: " + "dropped pkt, out of threads: %d of %d in use", + c->nick, str, + g_thread_pool_get_num_threads(c->pool), + SERVER_CON_POOL_SIZE); c->kick = TRUE; + g_free(str); + + return FALSE; } -#endif - if (g_thread_pool_get_num_threads(c->pool) < SERVER_MAX_TX_TH_PER_CON) - { - - struct thread *th; th = g_malloc(sizeof(struct thread)); - th->bytes = nbytes; - th->buf = g_memdup(pkt, nbytes); - ret = g_thread_pool_push(c->pool, (gpointer) th, &error); -// g_message("threads %d of %d", g_thread_pool_get_num_threads(c->pool), SERVER_MAX_TX_TH_PER_CON); + th->bytes = nbytes; + th->buf = g_memdup(pkt, nbytes); + ret = g_thread_pool_push(c->pool, (gpointer) th, &error); if (!ret) { + if (error) { - g_warning("%s", error->message); - g_clear_error (&error); + g_warning("%s:%d %s", __func__, __LINE__, + error->message); + + g_clear_error(&error); } + g_free(th->buf); g_free(th); } - } - else - g_message("dropped pkt"); + return ret; } @@ -335,19 +357,8 @@ static gint net_send_internal(struct con_data *c, const char *pkt, gsize nbytes) /** * @brief process the buffered network input * - * @note looks like none of these can be used to detect a disconnect - * of a client: - * g_io_stream_is_closed() - * g_socket_connection_is_connected() - * g_socket_is_closed(g_socket_connection_get_socket() - * g_socket_is_connected() - * g_socket_get_available_bytes() - * tried with glib 2.52.2+9+g3245eba16-1) - * we'll just record the amount of bytes in the stream and check - * whether it changed or not. if delta == 0 -> disconnected - * - * XXX after fixing up all bugs, this function has really nasty jump logic and - * needs rework asap + * XXX while it working flawlessly, this is still a very nasty function + * and needs refactoring, but I'll leave it be for now */ static void net_buffer_ready(GObject *source_object, GAsyncResult *res, @@ -367,7 +378,6 @@ static void net_buffer_ready(GObject *source_object, GAsyncResult *res, GBufferedInputStream *bistream; GError *error = NULL; - GBytes *b; c = (struct con_data *) user_data; @@ -375,6 +385,9 @@ static void net_buffer_ready(GObject *source_object, GAsyncResult *res, istream = G_INPUT_STREAM(source_object); bistream = G_BUFFERED_INPUT_STREAM(source_object); + if (g_cancellable_is_cancelled(c->ca)) + return; + pending: buf = g_buffered_input_stream_peek_buffer(bistream, &nbytes); @@ -402,8 +415,8 @@ pending: "%ld bytes.", pkt_size, g_buffered_input_stream_get_buffer_size(bistream)); - if (net_send(buf, nbytes) < 0) - goto error; + if (net_send(buf, nbytes) < 0) + goto error; if (pkt_size < MAX_PAYLOAD_SIZE) { @@ -434,14 +447,10 @@ pending: if (nbytes <= 0) goto error; -#if 0 - /* update stream byte count */ - g_buffered_input_stream_peek_buffer(bistream, &c->nbytes); -#endif pkt_hdr_to_host_order(pkt); /* verify packet payload */ - if (CRC16(pkt->data, pkt->data_size) == pkt->data_crc16) { + if (CRC16((guchar *) pkt->data, pkt->data_size) == pkt->data_crc16) { if (process_pkt(pkt, c->priv, c)) goto drop_pkt; @@ -459,12 +468,13 @@ pending: } else { g_message("Invalid CRC16 %x %x", - CRC16(pkt->data, pkt->data_size), pkt->data_crc16); + CRC16((guchar *) pkt->data, pkt->data_size), pkt->data_crc16); } drop_pkt: - g_message("Error occured, dropping input buffer and packet."); + g_message("Error occured in %s, dropping input buffer and packet.", + __func__); g_free(pkt); @@ -483,207 +493,242 @@ exit: /* continue buffering */ g_buffered_input_stream_fill_async(bistream, - g_buffered_input_stream_get_buffer_size(bistream), - G_PRIORITY_DEFAULT, - NULL, - net_buffer_ready, - c); + g_buffered_input_stream_get_buffer_size(bistream), + G_PRIORITY_DEFAULT, + c->ca, + net_buffer_ready, + c); return; error: - g_message("Error occured, dropping connection"); + g_message("Error occured in %s, initiating connection drop", __func__); + if (error) { - g_warning("%s", error->message); + g_warning("%s:%d %s", __func__, __LINE__, error->message); g_clear_error(&error); } - drop_connection(c); + + drop_con_begin(c); + drop_con_finalize(c); + return; } /** - * @brief handle an incoming connection + * @brief see if anyone has control if not, assign to current connection */ -static gboolean net_incoming(GSocketService *service, - GSocketConnection *connection, - GObject *source_object, - gpointer user_data) +static void assign_default_priv(struct con_data *c) { - gsize bufsize; - - GSocketAddress *addr; - GInetAddress *iaddr; - - GInputStream *istream; - GBufferedInputStream *bistream; + gboolean priv = FALSE; GList *elem; - struct con_data *c; struct con_data *item; - gboolean has_priv = FALSE; - gchar *str; + for (elem = con_list; elem; elem = elem->next) { + item = (struct con_data *) elem->data; - c = g_malloc0(sizeof(struct con_data)); + if (item->priv) + priv = TRUE; + } - addr = g_socket_connection_get_remote_address(connection, NULL); - iaddr = g_inet_socket_address_get_address(G_INET_SOCKET_ADDRESS(addr)); - str = g_inet_address_to_string(iaddr); + if (!priv) + c->priv = TRUE; +} + + +/** + * @brief begin reception of client data + */ - g_message("Received incoming connection from %s", str); +static void begin_reception(struct con_data *c) +{ + gsize bufsize; + + GInputStream *istream; + GBufferedInputStream *bistream; + + /* set up as buffered input stream */ + istream = g_io_stream_get_input_stream(G_IO_STREAM(c->con)); + istream = g_buffered_input_stream_new(istream); + + bistream = G_BUFFERED_INPUT_STREAM(istream); + bufsize = g_buffered_input_stream_get_buffer_size(bistream); + + g_buffered_input_stream_fill_async(bistream, bufsize, + G_PRIORITY_DEFAULT, c->ca, + net_buffer_ready, c); +} + +/** + * @brief setup connection details + */ + +static void setup_connection(struct con_data *c) +{ + gchar *str; + + str = net_get_host_string(c->con); c->nick = g_strdup_printf("UserUnknown (%s)", str); c->new = TRUE; c->kick = FALSE; + c->ca = g_cancellable_new(); g_mutex_init(&c->lock); c->pool = g_thread_pool_new(do_send, (gpointer) c, - SERVER_MAX_TX_TH_PER_CON, FALSE, NULL); + SERVER_CON_POOL_SIZE, FALSE, NULL); + + g_socket_set_keepalive(g_socket_connection_get_socket(c->con), TRUE); g_free(str); +} - /* set up as buffered input stream */ - istream = g_io_stream_get_input_stream(G_IO_STREAM(connection)); - istream = g_buffered_input_stream_new(istream); +/** + * @brief handle an incoming connection + */ - bistream = G_BUFFERED_INPUT_STREAM(istream); +static gboolean net_incoming(GSocketService *service, + GSocketConnection *connection, + GObject *source_object, + gpointer user_data) +{ + gchar *str; - bufsize = g_buffered_input_stream_get_buffer_size(bistream); + struct con_data *c; + + + c = g_malloc0(sizeof(struct con_data)); /* reference, so it is not dropped by glib */ c->con = g_object_ref(connection); - g_socket_set_keepalive(g_socket_connection_get_socket(c->con), TRUE); + setup_connection(c); + assign_default_priv(c); + begin_reception(c); - /* add to list of connections */ + /* add to list of connections for outgoing data */ con_list = g_list_append(con_list, c); - /* see if anyone has control, if not, assign to current connection */ - for (elem = con_list; elem; elem = elem->next) { - item = (struct con_data *) elem->data; - if (item->priv) - has_priv = TRUE; - } - - if (!has_priv) - c->priv = TRUE; - - /* push new username after 1 seconds, so they have time to configure * theirs */ g_timeout_add_seconds(1, net_push_userlist_cb, NULL); - g_buffered_input_stream_fill_async(bistream, bufsize, - G_PRIORITY_DEFAULT, NULL, - net_buffer_ready, c); + + str = net_get_host_string(c->con); + g_message("Received connection from %s", str); + g_free(str); return FALSE; } +/** + * @brief send a packet to single client + * + * @returns <0 on error + */ -void net_server_reassign_control(gpointer ref) +gint net_send_single(gpointer ref, const char *pkt, gsize nbytes) { - GSocketAddress *addr; - GInetAddress *iaddr; - - GList *elem; - struct con_data *c; - struct con_data *item; - - gchar *msg; - gchar *str; c = (struct con_data *) ref; - for (elem = con_list; elem; elem = elem->next) { - item = (struct con_data *) elem->data; - item->priv = FALSE; - } - - c->priv = TRUE; - addr = g_socket_connection_get_remote_address(c->con, NULL); - iaddr = g_inet_socket_address_get_address(G_INET_SOCKET_ADDRESS(addr)); - str = g_inet_address_to_string(iaddr); - msg = g_strdup_printf("Reassigned control to %s (connected from %s)", - c->nick, str); - - net_server_broadcast_message(msg, NULL); - net_push_userlist_cb(NULL); - - g_free(msg); - g_free(str); + return net_send_internal(c, pkt, nbytes); } - /** - * @brief send a packet to single client + * @brief send a packet to all connected clients * * @returns <0 on error */ -gint net_send_single(gpointer ref, const char *pkt, gsize nbytes) +gint net_send(const char *pkt, gsize nbytes) { - gint ret; + int ret = 0; GList *elem; struct con_data *c; - c = (struct con_data *) ref; - g_debug("Sending packet of %d bytes", nbytes); + for (elem = con_list; elem; elem = elem->next) { + + c = (struct con_data *) elem->data; + + if (c->kick) { -// g_mutex_lock(&netlock); + /* rewind one element, the current one will be removed + * in drop_con_begin() + */ + elem = elem->prev; - ret = net_send_internal(c, pkt, nbytes); + drop_con_begin(c); + + continue; + } + + ret |= net_send_single(c, pkt, nbytes); + } -// g_mutex_unlock(&netlock); return ret; } /** - * @brief se3nd a packet to all connected clients - * - * @returns <0 on error + * @brief assign control privilege level to connection */ -gint net_send(const char *pkt, gsize nbytes) +void net_server_reassign_control(gpointer ref) { GList *elem; + struct con_data *c; struct con_data *item; + gchar *msg; + gchar *str; - - g_debug("Broadcasting packet of %d bytes", nbytes); + c = (struct con_data *) ref; for (elem = con_list; elem; elem = elem->next) { item = (struct con_data *) elem->data; - net_send_single(item, pkt, nbytes); + item->priv = FALSE; } + c->priv = TRUE; - return 0; + str = net_get_host_string(c->con); + msg = g_strdup_printf("Reassigned control to %s (connected from %s)", + c->nick, str); + + net_server_broadcast_message(msg, NULL); + net_push_userlist_cb(NULL); + + g_free(msg); + g_free(str); } +/** + * @brief set the nickname for a connection + */ + void net_server_set_nickname(const gchar *nick, gpointer ref) { gchar *old; @@ -716,10 +761,13 @@ void net_server_set_nickname(const gchar *nick, gpointer ref) } +/** + * @brief broadcast a text message to all clients + */ + void net_server_broadcast_message(const gchar *msg, gpointer ref) { gchar *buf; - gchar *user; struct con_data *c; @@ -739,7 +787,7 @@ void net_server_broadcast_message(const gchar *msg, gpointer ref) } - cmd_message(PKT_TRANS_ID_UNDEF, buf, strlen(buf)); + cmd_message(PKT_TRANS_ID_UNDEF, (guchar *) buf, strlen(buf)); g_free(buf); }