Use poll instead of select

This commit is contained in:
Grégory Soutadé 2016-02-04 20:39:50 +01:00
parent 73732adbef
commit 58b0439d9d
1 changed files with 58 additions and 27 deletions

View File

@ -1,6 +1,8 @@
#define _GNU_SOURCE 1 // for POLLRDHUP && syncfs
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/select.h> #include <sys/select.h>
#include <sys/time.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <time.h> #include <time.h>
@ -11,6 +13,7 @@
#include <pthread.h> #include <pthread.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <poll.h>
#ifdef USE_SECCOMP #ifdef USE_SECCOMP
#include <seccomp.h> #include <seccomp.h>
@ -19,14 +22,15 @@
#include "ip_to_geo.h" #include "ip_to_geo.h"
#include "protocol.h" #include "protocol.h"
#define WAIT_TIME 100
#define MAX_WAIT_TIME 500
typedef struct { typedef struct {
int socket; int socket;
time_t timeout; time_t timeout;
int nb_remaining_requests; int nb_remaining_requests;
} socket_ctx_t; } socket_ctx_t;
// TODO : sandbox
typedef struct thread_ctx_s{ typedef struct thread_ctx_s{
struct thread_ctx_s* prev; struct thread_ctx_s* prev;
struct thread_ctx_s* next; struct thread_ctx_s* next;
@ -39,6 +43,7 @@ typedef struct thread_ctx_s{
int stop; int stop;
int quiet; int quiet;
pthread_mutex_t mutex; pthread_mutex_t mutex;
struct pollfd * pollfds;
} thread_ctx_t; } thread_ctx_t;
static pthread_mutex_t s_fastmutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t s_fastmutex = PTHREAD_MUTEX_INITIALIZER;
@ -53,8 +58,6 @@ void sigint(int sig)
shutdown(s_server_socket, SHUT_RDWR); shutdown(s_server_socket, SHUT_RDWR);
} }
// TODO signal capture
static int check_request(request_t* req) static int check_request(request_t* req)
{ {
if (req->magic != REQ_MAGIC) if (req->magic != REQ_MAGIC)
@ -80,6 +83,7 @@ static int handle_request(thread_ctx_t* thread_ctx, int socket)
{ {
request_t req; request_t req;
const uint8_t* geo; const uint8_t* geo;
int sent=0;
int ret = read(socket, &req, sizeof(req)); int ret = read(socket, &req, sizeof(req));
// Socket closed // Socket closed
@ -134,7 +138,12 @@ static int handle_request(thread_ctx_t* thread_ctx, int socket)
} }
} }
write(socket, &req, sizeof(req)); for (sent=0; sent < sizeof(req); sent += ret)
{
ret = write(socket, &((uint8_t*)&req)[sent], sizeof(req)-sent);
if (ret < 0)
return -1;
}
return 0; return 0;
} }
@ -158,6 +167,7 @@ static void delete_thread(thread_ctx_t* thread_ctx)
} }
free(thread_ctx->sockets); free(thread_ctx->sockets);
free(thread_ctx->pollfds);
if (thread_ctx->next) if (thread_ctx->next)
thread_ctx->next->prev = thread_ctx->prev; thread_ctx->next->prev = thread_ctx->prev;
@ -177,17 +187,17 @@ static inline void close_socket(socket_ctx_t* socket)
close(socket->socket); close(socket->socket);
} }
#define POLL_ERR_MASK (POLLRDHUP|POLLERR|POLLHUP|POLLNVAL)
static void* thread_loop(void* param) static void* thread_loop(void* param)
{ {
thread_ctx_t* ctx = (thread_ctx_t*)param; thread_ctx_t* ctx = (thread_ctx_t*)param;
int i, ret, nfds, nb_cur_sockets, nb_available_sockets; int i, ret, nfds, nb_cur_sockets, nb_available_sockets, poll_idx;
fd_set read_set, exc_set; struct timeval time1, time2, time_res;
struct timeval timeout; int wait_time = WAIT_TIME;
while (!ctx->stop) while (!ctx->stop)
{ {
FD_ZERO(&read_set);
FD_ZERO(&exc_set);
nfds = 0; nfds = 0;
pthread_mutex_lock(&ctx->mutex); pthread_mutex_lock(&ctx->mutex);
@ -199,10 +209,9 @@ static void* thread_loop(void* param)
{ {
if (ctx->sockets[i].timeout > 0) if (ctx->sockets[i].timeout > 0)
{ {
FD_SET(ctx->sockets[i].socket, &read_set); ctx->pollfds[nfds].fd = ctx->sockets[i].socket;
FD_SET(ctx->sockets[i].socket, &exc_set); ctx->pollfds[nfds].events = POLLIN|POLL_ERR_MASK;
if (ctx->sockets[i].socket > nfds) nfds++;
nfds = ctx->sockets[i].socket;
} }
} }
@ -215,20 +224,25 @@ static void* thread_loop(void* param)
if (!nb_available_sockets) if (!nb_available_sockets)
break; break;
usleep(100); if (wait_time < MAX_WAIT_TIME)
wait_time += WAIT_TIME;
usleep(wait_time);
continue; continue;
} }
else
timeout.tv_sec = ctx->max_timeout; wait_time = WAIT_TIME;
timeout.tv_usec = 0;
ret = select(nfds+1, &read_set, NULL, &exc_set, &timeout); gettimeofday(&time1, NULL);
ret = poll(ctx->pollfds, nfds, ctx->max_timeout);
gettimeofday(&time2, NULL);
// Timeout, remove all current sockets // Timeout, remove all current sockets
if (ret == 0) if (ret == 0)
{ {
if (ctx->quiet < 0) if (ctx->quiet < 0)
syslog(LOG_DEBUG, "Timeout"); syslog(LOG_DEBUG, "Timeout");
for(i=0; i<nb_cur_sockets; i++) for(i=0; i<nb_cur_sockets; i++)
{ {
if (ctx->sockets[i].timeout > 0) if (ctx->sockets[i].timeout > 0)
@ -238,21 +252,35 @@ static void* thread_loop(void* param)
else if (ret < 0) else if (ret < 0)
{ {
if (!s_stop && !ctx->stop) if (!s_stop && !ctx->stop)
syslog(LOG_WARNING, "select has errors (%m)\n"); syslog(LOG_WARNING, "poll has errors (%m)\n");
break;
} }
else else
{ {
for(i=0; i<nb_cur_sockets; i++) timersub(&time2, &time1, &time_res);
poll_idx = -1;
for(i=0; i<ctx->nb_cur_sockets; i++)
{ {
if (ctx->sockets[i].timeout < 0) continue; if (ctx->sockets[i].timeout < 0) continue;
if (FD_ISSET(ctx->sockets[i].socket, &exc_set)) poll_idx++;
if (ctx->pollfds[poll_idx].fd != ctx->sockets[i].socket)
{ {
if (ctx->quiet < 0)
syslog(LOG_ERR, "Socket not found but present in poll fds");
continue;
}
// Error
if (ctx->pollfds[poll_idx].revents & POLL_ERR_MASK)
{
if (ctx->quiet < 0)
syslog(LOG_ERR, "Error with socket %d", ctx->sockets[i].socket);
close_socket(&ctx->sockets[i]); close_socket(&ctx->sockets[i]);
} }
// Someone is speaking // Someone is speaking
else if (FD_ISSET(ctx->sockets[i].socket, &read_set)) else if (ctx->pollfds[poll_idx].revents & POLLIN)
{ {
ctx->sockets[i].timeout = ctx->max_timeout; ctx->sockets[i].timeout = ctx->max_timeout*1000;
ret = handle_request(ctx, ctx->sockets[i].socket); ret = handle_request(ctx, ctx->sockets[i].socket);
if (ret == 1) if (ret == 1)
{ {
@ -273,7 +301,9 @@ static void* thread_loop(void* param)
} }
else else
{ {
ctx->sockets[i].timeout -= timeout.tv_sec*1000000 + timeout.tv_usec; ctx->sockets[i].timeout -= (time_res.tv_sec*1000000 + time_res.tv_usec);
if (ctx->sockets[i].timeout <= 0)
close_socket(&ctx->sockets[i]);
} }
} }
} }
@ -294,9 +324,10 @@ static inline thread_ctx_t* create_thread_ctx(struct gengetopt_args_info* params
syslog(LOG_DEBUG, "Create a new thread %p", thread_ctx); syslog(LOG_DEBUG, "Create a new thread %p", thread_ctx);
thread_ctx->sockets = malloc(sizeof(*thread_ctx->sockets)*params->sockets_per_thread_arg); thread_ctx->sockets = malloc(sizeof(*thread_ctx->sockets)*params->sockets_per_thread_arg);
thread_ctx->pollfds = malloc(sizeof(*thread_ctx->pollfds)*params->sockets_per_thread_arg);
thread_ctx->nb_cur_sockets = 0; thread_ctx->nb_cur_sockets = 0;
thread_ctx->nb_available_sockets = params->sockets_per_thread_arg; thread_ctx->nb_available_sockets = params->sockets_per_thread_arg;
thread_ctx->max_timeout = params->sockets_timeout_arg*1000000; thread_ctx->max_timeout = params->sockets_timeout_arg*1000;
thread_ctx->stop = 0; thread_ctx->stop = 0;
thread_ctx->quiet = params->quiet_flag; thread_ctx->quiet = params->quiet_flag;
if (params->verbose_flag) if (params->verbose_flag)
@ -330,7 +361,7 @@ static void fill_new_socket(struct gengetopt_args_info* params, int socket)
pthread_mutex_unlock(&s_fastmutex); pthread_mutex_unlock(&s_fastmutex);
thread_ctx->sockets[thread_ctx->nb_cur_sockets].socket = socket; thread_ctx->sockets[thread_ctx->nb_cur_sockets].socket = socket;
thread_ctx->sockets[thread_ctx->nb_cur_sockets].timeout = thread_ctx->max_timeout; thread_ctx->sockets[thread_ctx->nb_cur_sockets].timeout = thread_ctx->max_timeout*1000; // ms -> us
thread_ctx->sockets[thread_ctx->nb_cur_sockets].nb_remaining_requests = params->client_max_requests_arg; thread_ctx->sockets[thread_ctx->nb_cur_sockets].nb_remaining_requests = params->client_max_requests_arg;
pthread_mutex_lock(&thread_ctx->mutex); pthread_mutex_lock(&thread_ctx->mutex);