IP to Geo

IP to Geo Git Source Tree

Root/src/server.c

1#define _GNU_SOURCE 1 // for POLLRDHUP && syncfs
2#include <sys/types.h>
3#include <sys/socket.h>
4#include <sys/select.h>
5#include <sys/time.h>
6#include <arpa/inet.h>
7#include <netinet/in.h>
8#include <time.h>
9#include <syslog.h>
10#include <signal.h>
11#include <stdio.h>
12#include <unistd.h>
13#include <pthread.h>
14#include <stdlib.h>
15#include <string.h>
16#include <poll.h>
17
18#ifdef USE_SECCOMP
19#include <seccomp.h>
20#endif
21
22#include "ip_to_geo.h"
23#include "protocol.h"
24
25#define WAIT_TIME 100
26#define MAX_WAIT_TIME 500
27
28typedef struct {
29 int socket;
30 time_t timeout; // in µs
31 int nb_remaining_requests;
32} socket_ctx_t;
33
34typedef struct thread_ctx_s{
35 struct thread_ctx_s* prev;
36 struct thread_ctx_s* next;
37 pthread_t thread;
38 socket_ctx_t* sockets;
39 int nb_cur_sockets;
40 int nb_available_sockets;
41 int max_timeout;
42 int max_sockets; // in ms
43 int stop;
44 int quiet;
45 pthread_mutex_t mutex;
46 struct pollfd * pollfds;
47} thread_ctx_t;
48
49static pthread_mutex_t s_fastmutex = PTHREAD_MUTEX_INITIALIZER;
50static thread_ctx_t* s_last_thread = NULL;
51static int s_server_socket = -1;
52static int s_stop = 0;
53
54void sigint(int sig)
55{
56 syslog(LOG_WARNING, "signal received, stopping threads");
57 s_stop = 1;
58 shutdown(s_server_socket, SHUT_RDWR);
59}
60
61static int check_request(request_t* req)
62{
63 if (req->magic != REQ_MAGIC)
64 return REQ_ERR_BAD_MAGIC;
65
66 if (req->version != REQ_VERSION)
67 return REQ_ERR_BAD_VERSION;
68
69 if (req->ip_type != REQ_IPV4 &&
70 req->ip_type != REQ_IPV6)
71 return REQ_ERR_BAD_IP_VERSION;
72
73 if (req->ip_type != REQ_IPV4)
74 return REQ_ERR_UNSUPPORTED_IP_VERSION;
75
76 if (req->req != REQ_REQ)
77 return REQ_ERR_BAD_REQ_FIELD;
78
79 return REQ_ERR_NO_ERR;
80}
81
82static int handle_request(thread_ctx_t* thread_ctx, int socket)
83{
84 request_t req;
85 const uint8_t* geo;
86 int sent=0;
87 int ret = read(socket, &req, sizeof(req));
88
89 // Socket closed
90 if (ret == 0)
91 {
92 if (thread_ctx->quiet < 0)
93 syslog(LOG_DEBUG, "Socket %d closed", socket);
94 return 1;
95 }
96
97 if (thread_ctx->quiet < 0)
98 syslog(LOG_DEBUG, "New request");
99
100 if (ret != sizeof(req))
101 {
102 if (thread_ctx->quiet < 0)
103 syslog(LOG_DEBUG, "Invalid request size %d", ret);
104
105 return -1;
106 }
107
108 ret = check_request(&req);
109 req.req = REQ_RESP;
110
111 if (ret)
112 {
113 req.err = ret;
114 if (thread_ctx->quiet < 0)
115 syslog(LOG_DEBUG, "Request error %d", ret);
116 }
117 else
118 {
119 if (thread_ctx->quiet < 0)
120 {
121 char dst[64];
122 inet_ntop((req.ip_type == REQ_IPV4)?AF_INET:AF_INET6, req.ip, dst, sizeof(dst));
123 syslog(LOG_DEBUG, "Request for %s from socket %d", dst, socket);
124 }
125
126 geo = ip_to_geo(req.ip, req.ip_type);
127 if (!geo)
128 {
129 req.err = REQ_IP_NOT_FOUND;
130 if (thread_ctx->quiet < 0)
131 syslog(LOG_DEBUG, "Not found");
132 }
133 else
134 {
135 req.err = REQ_ERR_NO_ERR;
136 geo = get_country_code(geo);
137 if (thread_ctx->quiet < 0)
138 syslog(LOG_DEBUG, "Res %s", geo);
139 req.country_code[0] = geo[0];
140 req.country_code[1] = geo[1];
141 req.country_code[2] = 0;
142 req.country_code[3] = 0;
143 }
144 }
145
146 for (sent=0; sent < sizeof(req); sent += ret)
147 {
148 ret = write(socket, &((uint8_t*)&req)[sent], sizeof(req)-sent);
149 if (ret < 0)
150 return -1;
151 }
152
153 return 0;
154}
155
156static void delete_thread(thread_ctx_t* thread_ctx)
157{
158 int i;
159
160 pthread_mutex_lock(&s_fastmutex);
161 thread_ctx->nb_available_sockets = 0;
162
163 if (thread_ctx->quiet < 0)
164 syslog(LOG_DEBUG, "Delete thread %p", thread_ctx);
165
166 for(i=0; i<thread_ctx->nb_cur_sockets; i++)
167 {
168 if (thread_ctx->sockets[i].timeout > 0)
169 {
170 close (thread_ctx->sockets[i].socket);
171 }
172 }
173
174 free(thread_ctx->sockets);
175 free(thread_ctx->pollfds);
176
177 if (thread_ctx->next)
178 thread_ctx->next->prev = thread_ctx->prev;
179 if (thread_ctx->prev)
180 thread_ctx->prev->next = thread_ctx->next;
181
182 if (thread_ctx == s_last_thread)
183 s_last_thread = thread_ctx->next;
184 pthread_mutex_unlock(&s_fastmutex);
185
186 free(thread_ctx);
187}
188
189static inline void close_socket(socket_ctx_t* socket)
190{
191 socket->timeout = -1;
192 close(socket->socket);
193}
194
195#define POLL_ERR_MASK (POLLRDHUP|POLLERR|POLLHUP|POLLNVAL)
196
197static void* thread_loop(void* param)
198{
199 thread_ctx_t* ctx = (thread_ctx_t*)param;
200 int i, ret, nfds, nb_cur_sockets, nb_available_sockets, poll_idx;
201 struct timeval time1, time2, time_res;
202 int wait_time = WAIT_TIME;
203
204 while (!ctx->stop)
205 {
206 nfds = 0;
207
208 pthread_mutex_lock(&ctx->mutex);
209 nb_cur_sockets = ctx->nb_cur_sockets;
210 nb_available_sockets = ctx->nb_available_sockets;
211 pthread_mutex_unlock(&ctx->mutex);
212
213 for(i=0; i<nb_cur_sockets; i++)
214 {
215 if (ctx->sockets[i].timeout > 0)
216 {
217 ctx->pollfds[nfds].fd = ctx->sockets[i].socket;
218 ctx->pollfds[nfds].events = POLLIN|POLL_ERR_MASK;
219 nfds++;
220 }
221 }
222
223 if (!nfds)
224 {
225 /*
226 No more active socket for this thread
227 nor available slots
228 */
229 if (!nb_available_sockets)
230 break;
231
232 if (wait_time < MAX_WAIT_TIME)
233 wait_time += WAIT_TIME;
234
235 usleep(wait_time);
236 continue;
237 }
238 else
239 wait_time = WAIT_TIME;
240
241 gettimeofday(&time1, NULL);
242 ret = poll(ctx->pollfds, nfds, ctx->max_timeout);
243 gettimeofday(&time2, NULL);
244
245 // Timeout, remove all current sockets
246 if (ret == 0)
247 {
248 if (ctx->quiet < 0)
249 syslog(LOG_DEBUG, "Timeout");
250
251 for(i=0; i<nb_cur_sockets; i++)
252 {
253 if (ctx->sockets[i].timeout > 0)
254 close_socket(&ctx->sockets[i]);
255 }
256 }
257 else if (ret < 0)
258 {
259 if (!s_stop && !ctx->stop)
260 syslog(LOG_WARNING, "poll has errors (%m)\n");
261 break;
262 }
263 else
264 {
265 timersub(&time2, &time1, &time_res);
266 poll_idx = -1;
267 for(i=0; i<nb_cur_sockets; i++)
268 {
269 if (ctx->sockets[i].timeout < 0) continue;
270 poll_idx++;
271 if (ctx->pollfds[poll_idx].fd != ctx->sockets[i].socket)
272 {
273 if (ctx->quiet < 0)
274 syslog(LOG_ERR, "Socket not found but present in poll fds");
275 continue;
276 }
277
278 // Error
279 if (ctx->pollfds[poll_idx].revents & POLL_ERR_MASK)
280 {
281 if (ctx->quiet < 0)
282 syslog(LOG_ERR, "Error (or closed) socket %d", ctx->sockets[i].socket);
283 close_socket(&ctx->sockets[i]);
284 }
285 // Someone is speaking
286 else if (ctx->pollfds[poll_idx].revents & POLLIN)
287 {
288 ctx->sockets[i].timeout = ctx->max_timeout*1000;
289 ret = handle_request(ctx, ctx->sockets[i].socket);
290 if (ret == 1)
291 {
292 if (ctx->quiet < 0)
293 syslog(LOG_DEBUG, "Client has closed socket %d",
294 ctx->sockets[i].socket);
295 close_socket(&ctx->sockets[i]);
296 }
297 // No more requests accepted
298 else if (!ctx->sockets[i].nb_remaining_requests--)
299 {
300 if (ctx->quiet < 0)
301 syslog(LOG_DEBUG, "Max requests reached for socket %d",
302 ctx->sockets[i].socket);
303 syncfs(ctx->sockets[i].socket);
304 close_socket(&ctx->sockets[i]);
305 }
306 }
307 else
308 {
309 ctx->sockets[i].timeout -= (time_res.tv_sec*1000000 + time_res.tv_usec);
310 if (ctx->sockets[i].timeout <= 0)
311 close_socket(&ctx->sockets[i]);
312 }
313 }
314 }
315 };
316
317 delete_thread(ctx);
318
319 pthread_exit(NULL);
320
321 return NULL;
322}
323
324static inline thread_ctx_t* create_thread_ctx(struct gengetopt_args_info* params)
325{
326 thread_ctx_t* thread_ctx = malloc(sizeof(*thread_ctx));
327
328 if (params->verbose_flag)
329 syslog(LOG_DEBUG, "Create a new thread %p", thread_ctx);
330
331 thread_ctx->sockets = malloc(sizeof(*thread_ctx->sockets)*params->sockets_per_thread_arg);
332 thread_ctx->pollfds = malloc(sizeof(*thread_ctx->pollfds)*params->sockets_per_thread_arg);
333 thread_ctx->nb_cur_sockets = 0;
334 thread_ctx->nb_available_sockets = params->sockets_per_thread_arg;
335 thread_ctx->max_timeout = params->sockets_timeout_arg*1000;
336 thread_ctx->stop = 0;
337 thread_ctx->quiet = params->quiet_flag;
338 if (params->verbose_flag)
339 thread_ctx->quiet = -1;
340 thread_ctx->prev = NULL;
341 pthread_mutex_init(&thread_ctx->mutex, NULL);
342
343 thread_ctx->next = s_last_thread;
344 if (s_last_thread)
345 s_last_thread->prev = thread_ctx;
346 s_last_thread = thread_ctx;
347
348 return thread_ctx;
349}
350
351static void fill_new_socket(struct gengetopt_args_info* params, int socket)
352{
353 thread_ctx_t* thread_ctx;
354 int launch_thread = 0;
355
356 pthread_mutex_lock(&s_fastmutex);
357
358 thread_ctx = s_last_thread;
359 if (!thread_ctx || !thread_ctx->nb_available_sockets)
360 {
361 thread_ctx = create_thread_ctx(params);
362 launch_thread = 1;
363 }
364
365 pthread_mutex_unlock(&s_fastmutex);
366
367 thread_ctx->sockets[thread_ctx->nb_cur_sockets].socket = socket;
368 thread_ctx->sockets[thread_ctx->nb_cur_sockets].timeout = thread_ctx->max_timeout*1000; // ms -> us
369 thread_ctx->sockets[thread_ctx->nb_cur_sockets].nb_remaining_requests = params->client_max_requests_arg;
370
371 pthread_mutex_lock(&thread_ctx->mutex);
372 thread_ctx->nb_cur_sockets++;
373 thread_ctx->nb_available_sockets--;
374 pthread_mutex_unlock(&thread_ctx->mutex);
375
376 if (launch_thread)
377 pthread_create(&thread_ctx->thread, NULL, thread_loop, thread_ctx);
378}
379
380int daemonize(struct gengetopt_args_info* params)
381{
382 int ret;
383 struct sockaddr_in sockaddr;
384 socklen_t sockaddr_len;
385 int new_socket;
386 void* thread_ret;
387
388 // Should have both ipv4 & ipv6
389 s_server_socket = socket(AF_INET, SOCK_STREAM, 0); // Should have both TCP & UDP
390
391 if (!s_server_socket)
392 {
393 if (!params->quiet_flag)
394 fprintf(stderr, "Unable to create socket (%m)\n");
395 return -1;
396 }
397
398 memset(&sockaddr, 0, sizeof(sockaddr));
399 sockaddr.sin_family = AF_INET; // Should detect interface type (v4 or v6)
400 sockaddr.sin_port = htons(params->port_arg);
401 if (params->bind_ip_given)
402 {
403 ret = inet_aton(params->bind_ip_arg, &sockaddr.sin_addr);
404 if (ret)
405 {
406 if (!params->quiet_flag)
407 fprintf(stderr, "Error with bind address %s (%m)\n", params->bind_ip_arg);
408 return -1;
409 }
410 }
411 else
412 sockaddr.sin_addr.s_addr = INADDR_ANY;
413
414 ret = bind(s_server_socket, (struct sockaddr *)&sockaddr, sizeof(sockaddr));
415 if (ret)
416 {
417 if (!params->quiet_flag)
418 fprintf(stderr, "Unable to bind (%m)\n");
419 return -2;
420 }
421
422 ret = listen(s_server_socket, 0);
423 if (ret)
424 {
425 if (!params->quiet_flag)
426 fprintf(stderr, "Unable to listen (%m)\n");
427 return -3;
428 }
429
430 if (!params->no_background_flag)
431 {
432 ret = daemon(0, 0);
433 if (ret)
434 {
435 if (!params->quiet_flag)
436 fprintf(stderr, "Daemon error (%m)\n");
437 return -4;
438 }
439 }
440
441 openlog("ip_to_geod", 0, LOG_DAEMON);
442
443 syslog(LOG_INFO, "ip_togeod started\n");
444
445 signal(SIGINT, sigint);
446 signal(SIGUSR1, sigint);
447 signal(SIGUSR2, sigint);
448
449#ifdef USE_SECCOMP
450 scmp_filter_ctx seccomp_ctx = seccomp_init(SCMP_ACT_KILL);
451
452 if (seccomp_ctx == NULL)
453 {
454 syslog(LOG_ERR, "unable to initialize seccomp\n");
455 return -5;
456 }
457
458 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(read), 0);
459 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(write), 0);
460 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(close), 0);
461 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(accept), 0);
462#endif
463
464 while (!s_stop)
465 {
466 sockaddr_len = sizeof(sockaddr);
467 new_socket = accept(s_server_socket, (struct sockaddr *) &sockaddr, &sockaddr_len);
468 if (new_socket < 0)
469 {
470 if (!s_stop)
471 syslog(LOG_ERR, "accept error (%m), exiting");
472 break;
473 }
474 if (!params->quiet_flag)
475 syslog(LOG_INFO, "new connection from %s, socket %d",
476 inet_ntoa(sockaddr.sin_addr), new_socket);
477 fill_new_socket(params, new_socket);
478 }
479
480 close(s_server_socket);
481
482 while (s_last_thread)
483 {
484 s_last_thread->stop = 1;
485 pthread_join(s_last_thread->thread, &thread_ret);
486 }
487
488 closelog();
489
490#ifdef USE_SECCOMP
491 if (seccomp_ctx)
492 seccomp_release(seccomp_ctx);
493#endif
494
495 return 0;
496}
497

Archive Download this file

Branches

Tags