Skip to content
Snippets Groups Projects
Commit 73439814 authored by Armin Luntzer's avatar Armin Luntzer
Browse files

server net: fixed all bugs I could trigger

parent ebc5a705
No related branches found
No related tags found
No related merge requests found
...@@ -30,7 +30,10 @@ ...@@ -30,7 +30,10 @@
#include <gio/gio.h> #include <gio/gio.h>
#include <glib.h> #include <glib.h>
#define SERVER_MAX_TX_TH_PER_CON 10 /* this number must be large enough to handle multiple subsequent submissions
* of packets for a connection
*/
#define SERVER_CON_POOL_SIZE 16
/* client connection data */ /* client connection data */
...@@ -42,6 +45,7 @@ struct con_data { ...@@ -42,6 +45,7 @@ struct con_data {
gboolean new; gboolean new;
gboolean kick; gboolean kick;
GMutex lock; GMutex lock;
GCancellable *ca;
GThreadPool *pool; GThreadPool *pool;
}; };
...@@ -54,7 +58,6 @@ struct thread { ...@@ -54,7 +58,6 @@ struct thread {
/* tracks client connections */ /* tracks client connections */
static GList *con_list; static GList *con_list;
static GMutex netlock;
/** /**
...@@ -69,6 +72,27 @@ static gsize get_pkt_size_peek(struct packet *pkt) ...@@ -69,6 +72,27 @@ static gsize get_pkt_size_peek(struct packet *pkt)
} }
/**
* @brief returns the host address as string
*/
static gchar *net_get_host_string(GSocketConnection *con)
{
GSocketAddress *addr;
GInetAddress *iaddr;
addr = g_socket_connection_get_remote_address(con, NULL);
iaddr = g_inet_socket_address_get_address(G_INET_SOCKET_ADDRESS(addr));
return g_inet_address_to_string(iaddr);
}
/**
* @brief distribute a list of users to all clients
*/
static gboolean net_push_userlist_cb(gpointer data) static gboolean net_push_userlist_cb(gpointer data)
{ {
GList *elem; GList *elem;
...@@ -114,7 +138,7 @@ static gboolean net_push_userlist_cb(gpointer data) ...@@ -114,7 +138,7 @@ static gboolean net_push_userlist_cb(gpointer data)
if (msg) { if (msg) {
ack_userlist(PKT_TRANS_ID_UNDEF, msg, strlen(msg)); ack_userlist(PKT_TRANS_ID_UNDEF, (guchar *) msg, strlen(msg));
g_free(msg); g_free(msg);
} }
...@@ -123,210 +147,208 @@ static gboolean net_push_userlist_cb(gpointer data) ...@@ -123,210 +147,208 @@ static gboolean net_push_userlist_cb(gpointer data)
/** /**
* @brief drop a connection * @brief if connected, disconnect the socket
*/ */
static void drop_connection(struct con_data *c) static void try_disconnect_socket(struct con_data *c)
{ {
gchar *buf; gboolean ret = TRUE;
if (c->kick) { GSocket *s;
buf = g_strdup_printf("I kicked <tt><span foreground='#F1C40F'>" GError *error = NULL;
"%s</span></tt> for being a lazy bum",
c->nick);
} else {
buf = g_strdup_printf("<tt><span foreground='#F1C40F'>"
"%s</span></tt> disconnected",
c->nick);
}
net_server_broadcast_message(buf, NULL);
g_free(buf); if (!G_IS_SOCKET_CONNECTION(c->con))
g_free(c->nick); return;
g_object_unref(c->con); if (!g_socket_connection_is_connected(c->con))
return;
con_list = g_list_remove(con_list, c); s = g_socket_connection_get_socket(c->con);
g_thread_pool_free(c->pool, TRUE, FALSE); if (g_socket_is_connected(s))
ret = g_socket_close(s, &error);
/* BUG */
g_free(c);
net_push_userlist_cb(NULL); if (!ret) {
if (error) {
g_warning("%s:%d %s", __func__, __LINE__, error->message);
g_clear_error(&error);
}
}
} }
static void do_send(gpointer data, gpointer user_data)
{
struct con_data *c = (struct con_data *) user_data;
struct thread *th = (struct thread*) data;
GIOStream *stream;
GOutputStream *ostream;
GSocket *socket; /**
* @brief initiate a connection drop
*/
GError *error = NULL; static void drop_con_begin(struct con_data *c)
gboolean ret; {
gchar *str;
g_mutex_lock(&c->lock);
if (!G_IS_IO_STREAM(c->con)) { str = net_get_host_string(c->con);
c->kick = TRUE; g_message("Initiating disconnect for %s (%s)", str, c->nick);
goto bye; g_free(str);
}
socket = g_socket_connection_get_socket(c->con); con_list = g_list_remove(con_list, c);
if (!G_IS_SOCKET(socket)) {
c->kick = TRUE;
goto bye;
}
/* signal operations to stop */
g_cancellable_cancel(c->ca);
g_socket_set_timeout(socket, 10); g_thread_pool_free(c->pool, TRUE, FALSE);
c->pool = NULL;
stream = G_IO_STREAM(c->con); /* drop initial reference */
g_object_unref(c->con);
if (!G_IS_IO_STREAM(stream)) { try_disconnect_socket(c);
c->kick = TRUE;
goto bye;
} }
ostream = g_io_stream_get_output_stream(stream);
if (!G_IS_OUTPUT_STREAM(ostream)) { /**
c->kick = TRUE; * @brief finalize a connection drop
goto bye; */
}
ret = g_output_stream_write_all(ostream, th->buf, th->bytes, NULL, NULL, &error); static void drop_con_finalize(struct con_data *c)
{
gchar *buf;
if (!G_IS_SOCKET(socket)) {
c->kick = TRUE;
goto bye;
}
if (G_IS_OBJECT(c->con))
return;
if (!ret) { if (c->kick) {
if (error) { buf = g_strdup_printf("I kicked <tt><span foreground='#F1C40F'>"
g_warning("%s", error->message); "%s</span></tt> for being a lazy bum "
g_clear_error (&error); "(client input saturated or timed out)",
c->nick);
} else {
buf = g_strdup_printf("<tt><span foreground='#F1C40F'>"
"%s</span></tt> disconnected",
c->nick);
} }
c->kick = TRUE; net_server_broadcast_message(buf, NULL);
net_push_userlist_cb(NULL);
g_free(c->nick);
g_free(buf);
g_free(c);
} }
bye:
g_socket_set_timeout(socket, 0);
g_mutex_unlock(&c->lock);
g_free(th->buf);
g_free(th);
} /**
* @brief send data on a connection
*/
static void do_send(gpointer data, gpointer user_data)
{
gboolean ret;
GOutputStream *os;
struct thread *th;
struct con_data *c;
/**
* @brief send a packet on a connection
*/
static gint net_send_internal(struct con_data *c, const char *pkt, gsize nbytes)
{
gssize ret = 0;
th = (struct thread *) data;
c = (struct con_data *) user_data;
GError *error = NULL;
GIOStream *stream; if (!G_IS_IO_STREAM(c->con))
GOutputStream *ostream; goto exit;
g_object_ref(c->con);
os = g_io_stream_get_output_stream(G_IO_STREAM(c->con));
stream = G_IO_STREAM(c->con); g_mutex_lock(&c->lock);
if (!G_IS_IO_STREAM(stream)) ret = g_output_stream_write_all(os, th->buf, th->bytes, NULL, c->ca,
return -1; NULL);
if (c->kick) { if (!ret)
GSocket *socket; c->kick = TRUE;
socket = g_socket_connection_get_socket(c->con); g_mutex_unlock(&c->lock);
g_object_unref(c->con);
if (!g_socket_is_closed(socket)) { exit:
ret = g_socket_close(socket, &error); /* if this was the last reference, call finalize */
g_warning("Closed socket on timed out client"); if (!G_IS_OBJECT(c->con))
drop_con_finalize(c);
if (!ret) { g_free(th->buf);
if (error) { g_free(th);
g_warning("%s", error->message);
g_clear_error(&error);
}
}
} }
return -1;
}
/**
* @brief send a packet on a connection
*
* @returns 0 on success, otherwise error
*/
ostream = g_io_stream_get_output_stream(stream); static gboolean net_send_internal(struct con_data *c, const char *pkt, gsize nbytes)
{
gboolean ret;
struct thread *th;
if (g_io_stream_is_closed(stream)) { GError *error = NULL;
g_message("Error sending packet: stream closed\n");
return -1;
}
if (!g_socket_connection_is_connected(c->con)) { if (!c->pool)
g_message("Error sending packet: socket not connected\n"); return FALSE;
return -1;
}
#if 0 if (c->kick)
/* set a 1 second socket timeout to detect broken connections */ return FALSE;
g_socket_set_timeout(g_socket_connection_get_socket(c->con), 1);
ret = g_output_stream_write_all(ostream, pkt, nbytes, NULL, NULL, &error);
/* set back to infinite */
g_socket_set_timeout(g_socket_connection_get_socket(c->con), 0);
if (!ret) {
if (error) { if (g_thread_pool_get_num_threads(c->pool) >= SERVER_CON_POOL_SIZE) {
g_warning("%s", error->message);
g_clear_error (&error); gchar *str;
}
str = net_get_host_string(c->con);
g_message("Will kick client %s connected from %s: "
"dropped pkt, out of threads: %d of %d in use",
c->nick, str,
g_thread_pool_get_num_threads(c->pool),
SERVER_CON_POOL_SIZE);
c->kick = TRUE; c->kick = TRUE;
} g_free(str);
#endif
if (g_thread_pool_get_num_threads(c->pool) < SERVER_MAX_TX_TH_PER_CON) return FALSE;
{ }
struct thread *th;
th = g_malloc(sizeof(struct thread)); th = g_malloc(sizeof(struct thread));
th->bytes = nbytes; th->bytes = nbytes;
th->buf = g_memdup(pkt, nbytes); th->buf = g_memdup(pkt, nbytes);
ret = g_thread_pool_push(c->pool, (gpointer) th, &error);
// g_message("threads %d of %d", g_thread_pool_get_num_threads(c->pool), SERVER_MAX_TX_TH_PER_CON);
ret = g_thread_pool_push(c->pool, (gpointer) th, &error);
if (!ret) { if (!ret) {
if (error) { if (error) {
g_warning("%s", error->message); g_warning("%s:%d %s", __func__, __LINE__,
error->message);
g_clear_error(&error); g_clear_error(&error);
} }
g_free(th->buf); g_free(th->buf);
g_free(th); g_free(th);
} }
}
else
g_message("dropped pkt");
return ret; return ret;
} }
...@@ -335,19 +357,8 @@ static gint net_send_internal(struct con_data *c, const char *pkt, gsize nbytes) ...@@ -335,19 +357,8 @@ static gint net_send_internal(struct con_data *c, const char *pkt, gsize nbytes)
/** /**
* @brief process the buffered network input * @brief process the buffered network input
* *
* @note looks like none of these can be used to detect a disconnect * XXX while it working flawlessly, this is still a very nasty function
* of a client: * and needs refactoring, but I'll leave it be for now
* g_io_stream_is_closed()
* g_socket_connection_is_connected()
* g_socket_is_closed(g_socket_connection_get_socket()
* g_socket_is_connected()
* g_socket_get_available_bytes()
* tried with glib 2.52.2+9+g3245eba16-1)
* we'll just record the amount of bytes in the stream and check
* whether it changed or not. if delta == 0 -> disconnected
*
* XXX after fixing up all bugs, this function has really nasty jump logic and
* needs rework asap
*/ */
static void net_buffer_ready(GObject *source_object, GAsyncResult *res, static void net_buffer_ready(GObject *source_object, GAsyncResult *res,
...@@ -367,7 +378,6 @@ static void net_buffer_ready(GObject *source_object, GAsyncResult *res, ...@@ -367,7 +378,6 @@ static void net_buffer_ready(GObject *source_object, GAsyncResult *res,
GBufferedInputStream *bistream; GBufferedInputStream *bistream;
GError *error = NULL; GError *error = NULL;
GBytes *b;
c = (struct con_data *) user_data; c = (struct con_data *) user_data;
...@@ -375,6 +385,9 @@ static void net_buffer_ready(GObject *source_object, GAsyncResult *res, ...@@ -375,6 +385,9 @@ static void net_buffer_ready(GObject *source_object, GAsyncResult *res,
istream = G_INPUT_STREAM(source_object); istream = G_INPUT_STREAM(source_object);
bistream = G_BUFFERED_INPUT_STREAM(source_object); bistream = G_BUFFERED_INPUT_STREAM(source_object);
if (g_cancellable_is_cancelled(c->ca))
return;
pending: pending:
buf = g_buffered_input_stream_peek_buffer(bistream, &nbytes); buf = g_buffered_input_stream_peek_buffer(bistream, &nbytes);
...@@ -434,14 +447,10 @@ pending: ...@@ -434,14 +447,10 @@ pending:
if (nbytes <= 0) if (nbytes <= 0)
goto error; goto error;
#if 0
/* update stream byte count */
g_buffered_input_stream_peek_buffer(bistream, &c->nbytes);
#endif
pkt_hdr_to_host_order(pkt); pkt_hdr_to_host_order(pkt);
/* verify packet payload */ /* verify packet payload */
if (CRC16(pkt->data, pkt->data_size) == pkt->data_crc16) { if (CRC16((guchar *) pkt->data, pkt->data_size) == pkt->data_crc16) {
if (process_pkt(pkt, c->priv, c)) if (process_pkt(pkt, c->priv, c))
goto drop_pkt; goto drop_pkt;
...@@ -459,12 +468,13 @@ pending: ...@@ -459,12 +468,13 @@ pending:
} else { } else {
g_message("Invalid CRC16 %x %x", g_message("Invalid CRC16 %x %x",
CRC16(pkt->data, pkt->data_size), pkt->data_crc16); CRC16((guchar *) pkt->data, pkt->data_size), pkt->data_crc16);
} }
drop_pkt: drop_pkt:
g_message("Error occured, dropping input buffer and packet."); g_message("Error occured in %s, dropping input buffer and packet.",
__func__);
g_free(pkt); g_free(pkt);
...@@ -485,205 +495,240 @@ exit: ...@@ -485,205 +495,240 @@ exit:
g_buffered_input_stream_fill_async(bistream, g_buffered_input_stream_fill_async(bistream,
g_buffered_input_stream_get_buffer_size(bistream), g_buffered_input_stream_get_buffer_size(bistream),
G_PRIORITY_DEFAULT, G_PRIORITY_DEFAULT,
NULL, c->ca,
net_buffer_ready, net_buffer_ready,
c); c);
return; return;
error: error:
g_message("Error occured, dropping connection"); g_message("Error occured in %s, initiating connection drop", __func__);
if (error) { if (error) {
g_warning("%s", error->message); g_warning("%s:%d %s", __func__, __LINE__, error->message);
g_clear_error(&error); g_clear_error(&error);
} }
drop_connection(c);
drop_con_begin(c);
drop_con_finalize(c);
return; return;
} }
/** /**
* @brief handle an incoming connection * @brief see if anyone has control if not, assign to current connection
*/ */
static gboolean net_incoming(GSocketService *service, static void assign_default_priv(struct con_data *c)
GSocketConnection *connection,
GObject *source_object,
gpointer user_data)
{ {
gsize bufsize; gboolean priv = FALSE;
GSocketAddress *addr;
GInetAddress *iaddr;
GInputStream *istream;
GBufferedInputStream *bistream;
GList *elem; GList *elem;
struct con_data *c;
struct con_data *item; struct con_data *item;
gboolean has_priv = FALSE;
gchar *str; for (elem = con_list; elem; elem = elem->next) {
item = (struct con_data *) elem->data;
c = g_malloc0(sizeof(struct con_data)); if (item->priv)
priv = TRUE;
}
addr = g_socket_connection_get_remote_address(connection, NULL); if (!priv)
iaddr = g_inet_socket_address_get_address(G_INET_SOCKET_ADDRESS(addr)); c->priv = TRUE;
str = g_inet_address_to_string(iaddr); }
/**
* @brief begin reception of client data
*/
static void begin_reception(struct con_data *c)
{
gsize bufsize;
GInputStream *istream;
GBufferedInputStream *bistream;
g_message("Received incoming connection from %s", str); /* set up as buffered input stream */
istream = g_io_stream_get_input_stream(G_IO_STREAM(c->con));
istream = g_buffered_input_stream_new(istream);
bistream = G_BUFFERED_INPUT_STREAM(istream);
bufsize = g_buffered_input_stream_get_buffer_size(bistream);
g_buffered_input_stream_fill_async(bistream, bufsize,
G_PRIORITY_DEFAULT, c->ca,
net_buffer_ready, c);
}
/**
* @brief setup connection details
*/
static void setup_connection(struct con_data *c)
{
gchar *str;
str = net_get_host_string(c->con);
c->nick = g_strdup_printf("UserUnknown (%s)", str); c->nick = g_strdup_printf("UserUnknown (%s)", str);
c->new = TRUE; c->new = TRUE;
c->kick = FALSE; c->kick = FALSE;
c->ca = g_cancellable_new();
g_mutex_init(&c->lock); g_mutex_init(&c->lock);
c->pool = g_thread_pool_new(do_send, (gpointer) c, c->pool = g_thread_pool_new(do_send, (gpointer) c,
SERVER_MAX_TX_TH_PER_CON, FALSE, NULL); SERVER_CON_POOL_SIZE, FALSE, NULL);
g_socket_set_keepalive(g_socket_connection_get_socket(c->con), TRUE);
g_free(str); g_free(str);
}
/* set up as buffered input stream */ /**
istream = g_io_stream_get_input_stream(G_IO_STREAM(connection)); * @brief handle an incoming connection
istream = g_buffered_input_stream_new(istream); */
bistream = G_BUFFERED_INPUT_STREAM(istream); static gboolean net_incoming(GSocketService *service,
GSocketConnection *connection,
GObject *source_object,
gpointer user_data)
{
gchar *str;
bufsize = g_buffered_input_stream_get_buffer_size(bistream); struct con_data *c;
c = g_malloc0(sizeof(struct con_data));
/* reference, so it is not dropped by glib */ /* reference, so it is not dropped by glib */
c->con = g_object_ref(connection); c->con = g_object_ref(connection);
g_socket_set_keepalive(g_socket_connection_get_socket(c->con), TRUE); setup_connection(c);
assign_default_priv(c);
begin_reception(c);
/* add to list of connections */ /* add to list of connections for outgoing data */
con_list = g_list_append(con_list, c); con_list = g_list_append(con_list, c);
/* see if anyone has control, if not, assign to current connection */
for (elem = con_list; elem; elem = elem->next) {
item = (struct con_data *) elem->data;
if (item->priv)
has_priv = TRUE;
}
if (!has_priv)
c->priv = TRUE;
/* push new username after 1 seconds, so they have time to configure /* push new username after 1 seconds, so they have time to configure
* theirs * theirs
*/ */
g_timeout_add_seconds(1, net_push_userlist_cb, NULL); g_timeout_add_seconds(1, net_push_userlist_cb, NULL);
g_buffered_input_stream_fill_async(bistream, bufsize,
G_PRIORITY_DEFAULT, NULL, str = net_get_host_string(c->con);
net_buffer_ready, c); g_message("Received connection from %s", str);
g_free(str);
return FALSE; return FALSE;
} }
/**
* @brief send a packet to single client
*
* @returns <0 on error
*/
void net_server_reassign_control(gpointer ref) gint net_send_single(gpointer ref, const char *pkt, gsize nbytes)
{ {
GSocketAddress *addr;
GInetAddress *iaddr;
GList *elem;
struct con_data *c; struct con_data *c;
struct con_data *item;
gchar *msg;
gchar *str;
c = (struct con_data *) ref; c = (struct con_data *) ref;
for (elem = con_list; elem; elem = elem->next) {
item = (struct con_data *) elem->data;
item->priv = FALSE;
}
c->priv = TRUE;
addr = g_socket_connection_get_remote_address(c->con, NULL);
iaddr = g_inet_socket_address_get_address(G_INET_SOCKET_ADDRESS(addr));
str = g_inet_address_to_string(iaddr);
msg = g_strdup_printf("Reassigned control to %s (connected from %s)",
c->nick, str);
net_server_broadcast_message(msg, NULL);
net_push_userlist_cb(NULL);
g_free(msg); return net_send_internal(c, pkt, nbytes);
g_free(str);
} }
/** /**
* @brief send a packet to single client * @brief send a packet to all connected clients
* *
* @returns <0 on error * @returns <0 on error
*/ */
gint net_send_single(gpointer ref, const char *pkt, gsize nbytes) gint net_send(const char *pkt, gsize nbytes)
{ {
gint ret; int ret = 0;
GList *elem; GList *elem;
struct con_data *c; struct con_data *c;
c = (struct con_data *) ref;
g_debug("Sending packet of %d bytes", nbytes); for (elem = con_list; elem; elem = elem->next) {
c = (struct con_data *) elem->data;
if (c->kick) {
/* rewind one element, the current one will be removed
* in drop_con_begin()
*/
elem = elem->prev;
drop_con_begin(c);
// g_mutex_lock(&netlock); continue;
}
ret = net_send_internal(c, pkt, nbytes); ret |= net_send_single(c, pkt, nbytes);
}
// g_mutex_unlock(&netlock);
return ret; return ret;
} }
/** /**
* @brief se3nd a packet to all connected clients * @brief assign control privilege level to connection
*
* @returns <0 on error
*/ */
gint net_send(const char *pkt, gsize nbytes) void net_server_reassign_control(gpointer ref)
{ {
GList *elem; GList *elem;
struct con_data *c;
struct con_data *item; struct con_data *item;
gchar *msg;
gchar *str;
c = (struct con_data *) ref;
g_debug("Broadcasting packet of %d bytes", nbytes);
for (elem = con_list; elem; elem = elem->next) { for (elem = con_list; elem; elem = elem->next) {
item = (struct con_data *) elem->data; item = (struct con_data *) elem->data;
net_send_single(item, pkt, nbytes); item->priv = FALSE;
} }
c->priv = TRUE;
return 0; str = net_get_host_string(c->con);
msg = g_strdup_printf("Reassigned control to %s (connected from %s)",
c->nick, str);
net_server_broadcast_message(msg, NULL);
net_push_userlist_cb(NULL);
g_free(msg);
g_free(str);
} }
/**
* @brief set the nickname for a connection
*/
void net_server_set_nickname(const gchar *nick, gpointer ref) void net_server_set_nickname(const gchar *nick, gpointer ref)
{ {
gchar *old; gchar *old;
...@@ -716,10 +761,13 @@ void net_server_set_nickname(const gchar *nick, gpointer ref) ...@@ -716,10 +761,13 @@ void net_server_set_nickname(const gchar *nick, gpointer ref)
} }
/**
* @brief broadcast a text message to all clients
*/
void net_server_broadcast_message(const gchar *msg, gpointer ref) void net_server_broadcast_message(const gchar *msg, gpointer ref)
{ {
gchar *buf; gchar *buf;
gchar *user;
struct con_data *c; struct con_data *c;
...@@ -739,7 +787,7 @@ void net_server_broadcast_message(const gchar *msg, gpointer ref) ...@@ -739,7 +787,7 @@ void net_server_broadcast_message(const gchar *msg, gpointer ref)
} }
cmd_message(PKT_TRANS_ID_UNDEF, buf, strlen(buf)); cmd_message(PKT_TRANS_ID_UNDEF, (guchar *) buf, strlen(buf));
g_free(buf); g_free(buf);
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment