IP to Geo

IP to Geo Git Source Tree

Root/src/server.c

1/*
2 Copyright 2016 Grégory Soutadé
3
4 This file is part of iptogeo.
5
6 iptogeo is free software: you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation, either version 3 of the License, or
9 (at your option) any later version.
10
11 iptogeo is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with iptogeo. If not, see <http://www.gnu.org/licenses/>.
18*/
19
20#define _GNU_SOURCE 1 // for POLLRDHUP && syncfs
21#include <sys/types.h>
22#include <sys/socket.h>
23#include <sys/select.h>
24#include <sys/time.h>
25#include <sys/stat.h>
26#include <arpa/inet.h>
27#include <netinet/in.h>
28#include <netdb.h>
29#include <time.h>
30#include <syslog.h>
31#include <signal.h>
32#include <stdio.h>
33#include <unistd.h>
34#include <pthread.h>
35#include <stdlib.h>
36#include <string.h>
37#include <poll.h>
38#include <fcntl.h>
39
40#ifdef USE_SECCOMP
41#include <seccomp.h>
42#endif
43
44#include "ip_to_geo.h"
45#include "protocol.h"
46
47#define WAIT_TIME 100
48#define MAX_WAIT_TIME 500
49
50#define MAX_LISTENING_SOCKETS 16
51
52typedef struct {
53 int socket;
54 time_t timeout; // in µs
55 int nb_remaining_requests;
56} socket_ctx_t;
57
58typedef struct thread_ctx_s{
59 struct thread_ctx_s* prev;
60 struct thread_ctx_s* next;
61 pthread_t thread;
62 socket_ctx_t* sockets;
63 int nb_cur_sockets;
64 int nb_available_sockets;
65 int max_timeout;
66 int max_sockets; // in ms
67 int stop;
68 int quiet;
69 pthread_mutex_t mutex;
70 struct pollfd * pollfds;
71} thread_ctx_t;
72
73static pthread_mutex_t s_fastmutex = PTHREAD_MUTEX_INITIALIZER;
74static thread_ctx_t* s_last_thread = NULL;
75static int s_server_sockets[MAX_LISTENING_SOCKETS];
76static int s_nb_server_sockets = 0;
77static int s_stop = 0;
78
79void sigint(int sig)
80{
81 int i;
82
83 syslog(LOG_WARNING, "signal received, stopping threads");
84 s_stop = 1;
85
86 for (i=0; i<s_nb_server_sockets; i++)
87 shutdown(s_server_sockets[i], SHUT_RDWR);
88}
89
90static int check_request(request_t* req)
91{
92 if (req->magic != REQ_MAGIC)
93 return REQ_ERR_BAD_MAGIC;
94
95 if (req->version != REQ_VERSION)
96 return REQ_ERR_BAD_VERSION;
97
98 if (req->ip_type != REQ_IPV4 &&
99 req->ip_type != REQ_IPV6)
100 return REQ_ERR_BAD_IP_VERSION;
101
102 if (req->ip_type != REQ_IPV4 && req->ip_type != REQ_IPV6)
103 return REQ_ERR_UNSUPPORTED_IP_VERSION;
104
105 if (req->req != REQ_REQ)
106 return REQ_ERR_BAD_REQ_FIELD;
107
108 return REQ_ERR_NO_ERR;
109}
110
111static int handle_request(thread_ctx_t* thread_ctx, int socket)
112{
113 request_t req;
114 const uint8_t* geo;
115 int sent=0;
116 int ret = read(socket, &req, sizeof(req));
117
118 // Socket closed
119 if (ret == 0)
120 {
121 if (thread_ctx->quiet < 0)
122 syslog(LOG_DEBUG, "Socket %d closed", socket);
123 return 1;
124 }
125
126 if (thread_ctx->quiet < 0)
127 syslog(LOG_DEBUG, "New request");
128
129 if (ret != sizeof(req))
130 {
131 if (thread_ctx->quiet < 0)
132 syslog(LOG_DEBUG, "Invalid request size %d", ret);
133
134 return -1;
135 }
136
137 ret = check_request(&req);
138 req.req = REQ_RESP;
139
140 if (ret)
141 {
142 req.err = ret;
143 if (thread_ctx->quiet < 0)
144 syslog(LOG_DEBUG, "Request error %d", ret);
145 }
146 else
147 {
148 if (thread_ctx->quiet < 0)
149 {
150 char dst[64];
151 inet_ntop((req.ip_type == REQ_IPV4)?AF_INET:AF_INET6, req.ip, dst, sizeof(dst));
152 syslog(LOG_DEBUG, "Request for %s from socket %d", dst, socket);
153 }
154
155 geo = ip_to_geo(req.ip, req.ip_type);
156 if (!geo)
157 {
158 req.err = REQ_IP_NOT_FOUND;
159 if (thread_ctx->quiet < 0)
160 syslog(LOG_DEBUG, "Not found");
161 }
162 else
163 {
164 req.err = REQ_ERR_NO_ERR;
165 geo = get_country_code(geo);
166 if (thread_ctx->quiet < 0)
167 syslog(LOG_DEBUG, "Res %s", geo);
168 req.country_code[0] = geo[0];
169 req.country_code[1] = geo[1];
170 req.country_code[2] = 0;
171 req.country_code[3] = 0;
172 }
173 }
174
175 for (sent=0; sent < sizeof(req); sent += ret)
176 {
177 ret = write(socket, &((uint8_t*)&req)[sent], sizeof(req)-sent);
178 if (ret < 0)
179 return -1;
180 }
181
182 return 0;
183}
184
185static void delete_thread(thread_ctx_t* thread_ctx)
186{
187 int i;
188
189 pthread_mutex_lock(&s_fastmutex);
190 thread_ctx->nb_available_sockets = 0;
191
192 if (thread_ctx->quiet < 0)
193 syslog(LOG_DEBUG, "Delete thread %p", thread_ctx);
194
195 for(i=0; i<thread_ctx->nb_cur_sockets; i++)
196 {
197 if (thread_ctx->sockets[i].timeout > 0)
198 {
199 close (thread_ctx->sockets[i].socket);
200 }
201 }
202
203 free(thread_ctx->sockets);
204 free(thread_ctx->pollfds);
205
206 if (thread_ctx->next)
207 thread_ctx->next->prev = thread_ctx->prev;
208 if (thread_ctx->prev)
209 thread_ctx->prev->next = thread_ctx->next;
210
211 if (thread_ctx == s_last_thread)
212 s_last_thread = thread_ctx->next;
213 pthread_mutex_unlock(&s_fastmutex);
214
215 free(thread_ctx);
216}
217
218static inline void close_socket(socket_ctx_t* socket)
219{
220 socket->timeout = -1;
221 close(socket->socket);
222}
223
224#define POLL_ERR_MASK (POLLRDHUP|POLLERR|POLLHUP|POLLNVAL)
225
226static void* thread_loop(void* param)
227{
228 thread_ctx_t* ctx = (thread_ctx_t*)param;
229 int i, ret, nfds, nb_cur_sockets, nb_available_sockets, poll_idx;
230 struct timeval time1, time2, time_res;
231 int wait_time = WAIT_TIME;
232
233 while (!ctx->stop)
234 {
235 nfds = 0;
236
237 pthread_mutex_lock(&ctx->mutex);
238 nb_cur_sockets = ctx->nb_cur_sockets;
239 nb_available_sockets = ctx->nb_available_sockets;
240 pthread_mutex_unlock(&ctx->mutex);
241
242 for(i=0; i<nb_cur_sockets; i++)
243 {
244 if (ctx->sockets[i].timeout > 0)
245 {
246 ctx->pollfds[nfds].fd = ctx->sockets[i].socket;
247 ctx->pollfds[nfds].events = POLLIN|POLL_ERR_MASK;
248 nfds++;
249 }
250 }
251
252 if (!nfds)
253 {
254 /*
255 No more active socket for this thread
256 nor available slots
257 */
258 if (!nb_available_sockets)
259 break;
260
261 if (wait_time < MAX_WAIT_TIME)
262 wait_time += WAIT_TIME;
263
264 usleep(wait_time);
265 continue;
266 }
267 else
268 wait_time = WAIT_TIME;
269
270 gettimeofday(&time1, NULL);
271 ret = poll(ctx->pollfds, nfds, ctx->max_timeout);
272 gettimeofday(&time2, NULL);
273
274 // Timeout, remove all current sockets
275 if (ret == 0)
276 {
277 if (ctx->quiet < 0)
278 syslog(LOG_DEBUG, "Timeout");
279
280 for(i=0; i<nb_cur_sockets; i++)
281 {
282 if (ctx->sockets[i].timeout > 0)
283 close_socket(&ctx->sockets[i]);
284 }
285 }
286 else if (ret < 0)
287 {
288 if (!s_stop && !ctx->stop)
289 syslog(LOG_WARNING, "poll has errors (%m)\n");
290 break;
291 }
292 else
293 {
294 timersub(&time2, &time1, &time_res);
295 poll_idx = -1;
296 for(i=0; i<nb_cur_sockets; i++)
297 {
298 if (ctx->sockets[i].timeout < 0) continue;
299 poll_idx++;
300 if (ctx->pollfds[poll_idx].fd != ctx->sockets[i].socket)
301 {
302 if (ctx->quiet < 0)
303 syslog(LOG_ERR, "Socket not found but present in poll fds");
304 continue;
305 }
306
307 // Error
308 if (ctx->pollfds[poll_idx].revents & POLL_ERR_MASK)
309 {
310 if (ctx->quiet < 0)
311 syslog(LOG_ERR, "Error (or closed) socket %d", ctx->sockets[i].socket);
312 close_socket(&ctx->sockets[i]);
313 }
314 // Someone is speaking
315 else if (ctx->pollfds[poll_idx].revents & POLLIN)
316 {
317 ctx->sockets[i].timeout = ctx->max_timeout*1000;
318 ret = handle_request(ctx, ctx->sockets[i].socket);
319 if (ret == 1)
320 {
321 if (ctx->quiet < 0)
322 syslog(LOG_DEBUG, "Client has closed socket %d",
323 ctx->sockets[i].socket);
324 close_socket(&ctx->sockets[i]);
325 }
326 // No more requests accepted
327 else if (!ctx->sockets[i].nb_remaining_requests--)
328 {
329 if (ctx->quiet < 0)
330 syslog(LOG_DEBUG, "Max requests reached for socket %d",
331 ctx->sockets[i].socket);
332 syncfs(ctx->sockets[i].socket);
333 close_socket(&ctx->sockets[i]);
334 }
335 }
336 else
337 {
338 ctx->sockets[i].timeout -= (time_res.tv_sec*1000000 + time_res.tv_usec);
339 if (ctx->sockets[i].timeout <= 0)
340 close_socket(&ctx->sockets[i]);
341 }
342 }
343 }
344 };
345
346 delete_thread(ctx);
347
348 pthread_exit(NULL);
349
350 return NULL;
351}
352
353static inline thread_ctx_t* create_thread_ctx(struct gengetopt_args_info* params)
354{
355 thread_ctx_t* thread_ctx = malloc(sizeof(*thread_ctx));
356
357 if (params->verbose_flag)
358 syslog(LOG_DEBUG, "Create a new thread %p", thread_ctx);
359
360 thread_ctx->sockets = malloc(sizeof(*thread_ctx->sockets)*params->sockets_per_thread_arg);
361 thread_ctx->pollfds = malloc(sizeof(*thread_ctx->pollfds)*params->sockets_per_thread_arg);
362 thread_ctx->nb_cur_sockets = 0;
363 thread_ctx->nb_available_sockets = params->sockets_per_thread_arg;
364 thread_ctx->max_timeout = params->sockets_timeout_arg*1000;
365 thread_ctx->stop = 0;
366 thread_ctx->quiet = params->quiet_flag;
367 if (params->verbose_flag)
368 thread_ctx->quiet = -1;
369 thread_ctx->prev = NULL;
370 pthread_mutex_init(&thread_ctx->mutex, NULL);
371
372 thread_ctx->next = s_last_thread;
373 if (s_last_thread)
374 s_last_thread->prev = thread_ctx;
375 s_last_thread = thread_ctx;
376
377 return thread_ctx;
378}
379
380static void fill_new_socket(struct gengetopt_args_info* params, int socket)
381{
382 thread_ctx_t* thread_ctx;
383 int launch_thread = 0;
384
385 pthread_mutex_lock(&s_fastmutex);
386
387 thread_ctx = s_last_thread;
388 if (!thread_ctx || !thread_ctx->nb_available_sockets)
389 {
390 thread_ctx = create_thread_ctx(params);
391 launch_thread = 1;
392 }
393
394 pthread_mutex_unlock(&s_fastmutex);
395
396 thread_ctx->sockets[thread_ctx->nb_cur_sockets].socket = socket;
397 thread_ctx->sockets[thread_ctx->nb_cur_sockets].timeout = thread_ctx->max_timeout*1000; // ms -> us
398 thread_ctx->sockets[thread_ctx->nb_cur_sockets].nb_remaining_requests = params->client_max_requests_arg;
399
400 pthread_mutex_lock(&thread_ctx->mutex);
401 thread_ctx->nb_cur_sockets++;
402 thread_ctx->nb_available_sockets--;
403 pthread_mutex_unlock(&thread_ctx->mutex);
404
405 if (launch_thread)
406 pthread_create(&thread_ctx->thread, NULL, thread_loop, thread_ctx);
407}
408
409int daemonize(struct gengetopt_args_info* params)
410{
411 int ret, i;
412 struct sockaddr_in6 sockaddr;
413 socklen_t sockaddr_len;
414 int new_socket;
415 void* thread_ret;
416 struct addrinfo hints;
417 struct addrinfo *result, *rp;
418 char buffer[64];
419 int on;
420 struct pollfd pollfds[MAX_LISTENING_SOCKETS];
421
422 memset(&hints, 0, sizeof(hints));
423 hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
424 hints.ai_socktype = SOCK_STREAM; /* Datagram socket */
425 hints.ai_flags = (params->bind_ip_given)?0:AI_PASSIVE; /* For wildcard IP address */
426 hints.ai_protocol = 0; /* Any protocol */
427
428 snprintf(buffer, sizeof(buffer), "%d", params->port_arg);
429
430 ret = getaddrinfo(params->bind_ip_arg, buffer, &hints, &result);
431 if (ret)
432 {
433 fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(ret));
434 return -1;
435 }
436
437 for (rp=result; rp && s_nb_server_sockets < MAX_LISTENING_SOCKETS;
438 rp=rp->ai_next, s_nb_server_sockets++)
439 {
440 new_socket = socket(rp->ai_family,
441 rp->ai_socktype,
442 rp->ai_protocol); // Should have both TCP & UDP
443
444 if (!new_socket)
445 {
446 if (!params->quiet_flag)
447 fprintf(stderr, "Unable to create socket (%m)\n");
448 return -1;
449 }
450
451 s_server_sockets[s_nb_server_sockets] = new_socket;
452 pollfds[s_nb_server_sockets].fd = new_socket;
453 pollfds[s_nb_server_sockets].events = POLLIN|POLL_ERR_MASK;
454
455 on=1; setsockopt(new_socket, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
456 if (rp->ai_family == AF_INET6)
457 {
458 on=1; setsockopt(new_socket, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on));
459 }
460 memset(&sockaddr, 0, sizeof(sockaddr));
461 sockaddr.sin6_family = rp->ai_family; // Should detect interface type (v4 or v6)
462 sockaddr.sin6_port = htons(params->port_arg);
463 if (params->bind_ip_given)
464 {
465 ret = inet_pton(rp->ai_family, params->bind_ip_arg, &sockaddr.sin6_addr);
466 if (ret)
467 {
468 if (!params->quiet_flag)
469 fprintf(stderr, "Error with bind address %s (%m)\n", params->bind_ip_arg);
470 return -1;
471 }
472 }
473 else
474 sockaddr.sin6_addr = in6addr_any;
475
476 ret = bind(new_socket, (struct sockaddr *)&sockaddr, sizeof(sockaddr));
477 if (ret)
478 {
479 if (!params->quiet_flag)
480 fprintf(stderr, "Unable to bind (%m)\n");
481 return -2;
482 }
483
484 ret = listen(new_socket, 0);
485 if (ret)
486 {
487 if (!params->quiet_flag)
488 fprintf(stderr, "Unable to listen (%m)\n");
489 return -3;
490 }
491 }
492
493 if (rp)
494 fprintf(stderr, "Warning, max listening sockets reached !!\n");
495
496 if (!params->no_background_flag)
497 {
498 ret = daemon(0, 0);
499 if (ret)
500 {
501 if (!params->quiet_flag)
502 fprintf(stderr, "Daemon error (%m)\n");
503 return -4;
504 }
505 }
506
507 openlog("ip_to_geod", 0, LOG_DAEMON);
508
509 syslog(LOG_INFO, "ip_togeod started\n");
510
511 signal(SIGINT, sigint);
512 signal(SIGUSR1, sigint);
513 signal(SIGUSR2, sigint);
514
515#ifdef USE_SECCOMP
516 scmp_filter_ctx seccomp_ctx = seccomp_init(SCMP_ACT_KILL/*SCMP_ACT_TRAP*/);
517
518 if (seccomp_ctx == NULL)
519 {
520 syslog(LOG_ERR, "unable to initialize seccomp\n");
521 return -5;
522 }
523
524 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(poll), 0);
525 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(read), 0);
526 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(write), 0);
527 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(close), 0);
528 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(shutdown), 0);
529 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(accept), 0);
530 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(syncfs), 0);
531 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(nanosleep), 0);
532 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(restart_syscall), 0); // for usleep
533 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(rt_sigreturn), 0); // for signal handler
534 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(syslog), 0);
535 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(sendto), 0); // For syslog
536 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(open), 1, SCMP_A1(SCMP_CMP_EQ , O_RDONLY|O_CLOEXEC));
537 // For threads
538 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(mmap), 0);
539 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(munmap), 0);
540 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(mprotect), 0);
541 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(clone), 0);
542 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(fstat), 0);
543 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(access), 0);
544 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(exit), 0);
545 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(exit_group), 0);
546
547 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(set_robust_list), 0);
548 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(madvise), 0);
549 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(munlock), 0);
550 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(futex), 0);
551
552 ret = seccomp_load(seccomp_ctx);
553 if (ret < 0)
554 {
555 syslog(LOG_ERR, "Error seccomp load %d\n", ret);
556 return -6;
557 }
558#endif
559
560 while (!s_stop)
561 {
562 ret = poll(pollfds, s_nb_server_sockets, 1000);
563
564 if (ret < 0)
565 {
566 if (!s_stop)
567 syslog(LOG_ERR, "main poll err %d", ret);
568 break;
569 }
570
571 if (ret == 0) continue; // timeout
572
573 for (i=0; i<s_nb_server_sockets; i++)
574 {
575 if (pollfds[i].revents & POLL_ERR_MASK)
576 {
577 if (!s_stop)
578 syslog(LOG_ERR, "Error with main socket %d (%m)", s_server_sockets[i]);
579 close (s_server_sockets[i]);
580 continue;
581 }
582
583 if (!(pollfds[i].revents & POLLIN)) continue;
584
585 sockaddr_len = sizeof(sockaddr);
586 new_socket = accept(s_server_sockets[i], (struct sockaddr *) &sockaddr, &sockaddr_len);
587 if (new_socket < 0)
588 {
589 if (!s_stop)
590 syslog(LOG_ERR, "accept error (%m), exiting");
591 goto end;
592 }
593
594 if (!params->quiet_flag)
595 {
596 switch(sockaddr.sin6_family)
597 {
598 case AF_INET:
599 inet_ntop(AF_INET, &((struct sockaddr_in *)&sockaddr)->sin_addr, buffer, sizeof(buffer));
600 break;
601 case AF_INET6:
602 inet_ntop(AF_INET6, &sockaddr.sin6_addr, buffer, sizeof(buffer));
603 break;
604 }
605 /* inet_ntop(sockaddr.sin6_family, &sockaddr.sin6_addr, buffer, sockaddr_len); */
606 syslog(LOG_INFO, "new connection from %s, socket %d", buffer, new_socket);
607 }
608 fill_new_socket(params, new_socket);
609 }
610 }
611
612end:
613 for (i=0; i<s_nb_server_sockets; i++)
614 close(s_server_sockets[i]);
615
616 while (s_last_thread)
617 {
618 s_last_thread->stop = 1;
619 pthread_join(s_last_thread->thread, &thread_ret);
620 }
621
622 closelog();
623
624#ifdef USE_SECCOMP
625 if (seccomp_ctx)
626 seccomp_release(seccomp_ctx);
627#endif
628
629 return 0;
630}
631

Archive Download this file

Branches

Tags