IP to Geo

IP to Geo Git Source Tree

Root/src/server.c

1/*
2 Copyright 2016 Grégory Soutadé
3
4 This file is part of iptogeo.
5
6 iptogeo is free software: you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation, either version 3 of the License, or
9 (at your option) any later version.
10
11 iptogeo is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with iptogeo. If not, see <http://www.gnu.org/licenses/>.
18*/
19
20#define _GNU_SOURCE 1 // for POLLRDHUP && syncfs
21#include <sys/types.h>
22#include <sys/socket.h>
23#include <sys/select.h>
24#include <sys/time.h>
25#include <sys/stat.h>
26#include <arpa/inet.h>
27#include <netinet/in.h>
28#include <time.h>
29#include <syslog.h>
30#include <signal.h>
31#include <stdio.h>
32#include <unistd.h>
33#include <pthread.h>
34#include <stdlib.h>
35#include <string.h>
36#include <poll.h>
37#include <fcntl.h>
38
39#ifdef USE_SECCOMP
40#include <seccomp.h>
41#endif
42
43#include "ip_to_geo.h"
44#include "protocol.h"
45
46#define WAIT_TIME 100
47#define MAX_WAIT_TIME 500
48
49typedef struct {
50 int socket;
51 time_t timeout; // in µs
52 int nb_remaining_requests;
53} socket_ctx_t;
54
55typedef struct thread_ctx_s{
56 struct thread_ctx_s* prev;
57 struct thread_ctx_s* next;
58 pthread_t thread;
59 socket_ctx_t* sockets;
60 int nb_cur_sockets;
61 int nb_available_sockets;
62 int max_timeout;
63 int max_sockets; // in ms
64 int stop;
65 int quiet;
66 pthread_mutex_t mutex;
67 struct pollfd * pollfds;
68} thread_ctx_t;
69
70static pthread_mutex_t s_fastmutex = PTHREAD_MUTEX_INITIALIZER;
71static thread_ctx_t* s_last_thread = NULL;
72static int s_server_socket = -1;
73static int s_stop = 0;
74
75void sigint(int sig)
76{
77 syslog(LOG_WARNING, "signal received, stopping threads");
78 s_stop = 1;
79 shutdown(s_server_socket, SHUT_RDWR);
80}
81
82static int check_request(request_t* req)
83{
84 if (req->magic != REQ_MAGIC)
85 return REQ_ERR_BAD_MAGIC;
86
87 if (req->version != REQ_VERSION)
88 return REQ_ERR_BAD_VERSION;
89
90 if (req->ip_type != REQ_IPV4 &&
91 req->ip_type != REQ_IPV6)
92 return REQ_ERR_BAD_IP_VERSION;
93
94 if (req->ip_type != REQ_IPV4)
95 return REQ_ERR_UNSUPPORTED_IP_VERSION;
96
97 if (req->req != REQ_REQ)
98 return REQ_ERR_BAD_REQ_FIELD;
99
100 return REQ_ERR_NO_ERR;
101}
102
103static int handle_request(thread_ctx_t* thread_ctx, int socket)
104{
105 request_t req;
106 const uint8_t* geo;
107 int sent=0;
108 int ret = read(socket, &req, sizeof(req));
109
110 // Socket closed
111 if (ret == 0)
112 {
113 if (thread_ctx->quiet < 0)
114 syslog(LOG_DEBUG, "Socket %d closed", socket);
115 return 1;
116 }
117
118 if (thread_ctx->quiet < 0)
119 syslog(LOG_DEBUG, "New request");
120
121 if (ret != sizeof(req))
122 {
123 if (thread_ctx->quiet < 0)
124 syslog(LOG_DEBUG, "Invalid request size %d", ret);
125
126 return -1;
127 }
128
129 ret = check_request(&req);
130 req.req = REQ_RESP;
131
132 if (ret)
133 {
134 req.err = ret;
135 if (thread_ctx->quiet < 0)
136 syslog(LOG_DEBUG, "Request error %d", ret);
137 }
138 else
139 {
140 if (thread_ctx->quiet < 0)
141 {
142 char dst[64];
143 inet_ntop((req.ip_type == REQ_IPV4)?AF_INET:AF_INET6, req.ip, dst, sizeof(dst));
144 syslog(LOG_DEBUG, "Request for %s from socket %d", dst, socket);
145 }
146
147 geo = ip_to_geo(req.ip, req.ip_type);
148 if (!geo)
149 {
150 req.err = REQ_IP_NOT_FOUND;
151 if (thread_ctx->quiet < 0)
152 syslog(LOG_DEBUG, "Not found");
153 }
154 else
155 {
156 req.err = REQ_ERR_NO_ERR;
157 geo = get_country_code(geo);
158 if (thread_ctx->quiet < 0)
159 syslog(LOG_DEBUG, "Res %s", geo);
160 req.country_code[0] = geo[0];
161 req.country_code[1] = geo[1];
162 req.country_code[2] = 0;
163 req.country_code[3] = 0;
164 }
165 }
166
167 for (sent=0; sent < sizeof(req); sent += ret)
168 {
169 ret = write(socket, &((uint8_t*)&req)[sent], sizeof(req)-sent);
170 if (ret < 0)
171 return -1;
172 }
173
174 return 0;
175}
176
177static void delete_thread(thread_ctx_t* thread_ctx)
178{
179 int i;
180
181 pthread_mutex_lock(&s_fastmutex);
182 thread_ctx->nb_available_sockets = 0;
183
184 if (thread_ctx->quiet < 0)
185 syslog(LOG_DEBUG, "Delete thread %p", thread_ctx);
186
187 for(i=0; i<thread_ctx->nb_cur_sockets; i++)
188 {
189 if (thread_ctx->sockets[i].timeout > 0)
190 {
191 close (thread_ctx->sockets[i].socket);
192 }
193 }
194
195 free(thread_ctx->sockets);
196 free(thread_ctx->pollfds);
197
198 if (thread_ctx->next)
199 thread_ctx->next->prev = thread_ctx->prev;
200 if (thread_ctx->prev)
201 thread_ctx->prev->next = thread_ctx->next;
202
203 if (thread_ctx == s_last_thread)
204 s_last_thread = thread_ctx->next;
205 pthread_mutex_unlock(&s_fastmutex);
206
207 free(thread_ctx);
208}
209
210static inline void close_socket(socket_ctx_t* socket)
211{
212 socket->timeout = -1;
213 close(socket->socket);
214}
215
216#define POLL_ERR_MASK (POLLRDHUP|POLLERR|POLLHUP|POLLNVAL)
217
218static void* thread_loop(void* param)
219{
220 thread_ctx_t* ctx = (thread_ctx_t*)param;
221 int i, ret, nfds, nb_cur_sockets, nb_available_sockets, poll_idx;
222 struct timeval time1, time2, time_res;
223 int wait_time = WAIT_TIME;
224
225 while (!ctx->stop)
226 {
227 nfds = 0;
228
229 pthread_mutex_lock(&ctx->mutex);
230 nb_cur_sockets = ctx->nb_cur_sockets;
231 nb_available_sockets = ctx->nb_available_sockets;
232 pthread_mutex_unlock(&ctx->mutex);
233
234 for(i=0; i<nb_cur_sockets; i++)
235 {
236 if (ctx->sockets[i].timeout > 0)
237 {
238 ctx->pollfds[nfds].fd = ctx->sockets[i].socket;
239 ctx->pollfds[nfds].events = POLLIN|POLL_ERR_MASK;
240 nfds++;
241 }
242 }
243
244 if (!nfds)
245 {
246 /*
247 No more active socket for this thread
248 nor available slots
249 */
250 if (!nb_available_sockets)
251 break;
252
253 if (wait_time < MAX_WAIT_TIME)
254 wait_time += WAIT_TIME;
255
256 usleep(wait_time);
257 continue;
258 }
259 else
260 wait_time = WAIT_TIME;
261
262 gettimeofday(&time1, NULL);
263 ret = poll(ctx->pollfds, nfds, ctx->max_timeout);
264 gettimeofday(&time2, NULL);
265
266 // Timeout, remove all current sockets
267 if (ret == 0)
268 {
269 if (ctx->quiet < 0)
270 syslog(LOG_DEBUG, "Timeout");
271
272 for(i=0; i<nb_cur_sockets; i++)
273 {
274 if (ctx->sockets[i].timeout > 0)
275 close_socket(&ctx->sockets[i]);
276 }
277 }
278 else if (ret < 0)
279 {
280 if (!s_stop && !ctx->stop)
281 syslog(LOG_WARNING, "poll has errors (%m)\n");
282 break;
283 }
284 else
285 {
286 timersub(&time2, &time1, &time_res);
287 poll_idx = -1;
288 for(i=0; i<nb_cur_sockets; i++)
289 {
290 if (ctx->sockets[i].timeout < 0) continue;
291 poll_idx++;
292 if (ctx->pollfds[poll_idx].fd != ctx->sockets[i].socket)
293 {
294 if (ctx->quiet < 0)
295 syslog(LOG_ERR, "Socket not found but present in poll fds");
296 continue;
297 }
298
299 // Error
300 if (ctx->pollfds[poll_idx].revents & POLL_ERR_MASK)
301 {
302 if (ctx->quiet < 0)
303 syslog(LOG_ERR, "Error (or closed) socket %d", ctx->sockets[i].socket);
304 close_socket(&ctx->sockets[i]);
305 }
306 // Someone is speaking
307 else if (ctx->pollfds[poll_idx].revents & POLLIN)
308 {
309 ctx->sockets[i].timeout = ctx->max_timeout*1000;
310 ret = handle_request(ctx, ctx->sockets[i].socket);
311 if (ret == 1)
312 {
313 if (ctx->quiet < 0)
314 syslog(LOG_DEBUG, "Client has closed socket %d",
315 ctx->sockets[i].socket);
316 close_socket(&ctx->sockets[i]);
317 }
318 // No more requests accepted
319 else if (!ctx->sockets[i].nb_remaining_requests--)
320 {
321 if (ctx->quiet < 0)
322 syslog(LOG_DEBUG, "Max requests reached for socket %d",
323 ctx->sockets[i].socket);
324 syncfs(ctx->sockets[i].socket);
325 close_socket(&ctx->sockets[i]);
326 }
327 }
328 else
329 {
330 ctx->sockets[i].timeout -= (time_res.tv_sec*1000000 + time_res.tv_usec);
331 if (ctx->sockets[i].timeout <= 0)
332 close_socket(&ctx->sockets[i]);
333 }
334 }
335 }
336 };
337
338 delete_thread(ctx);
339
340 pthread_exit(NULL);
341
342 return NULL;
343}
344
345static inline thread_ctx_t* create_thread_ctx(struct gengetopt_args_info* params)
346{
347 thread_ctx_t* thread_ctx = malloc(sizeof(*thread_ctx));
348
349 if (params->verbose_flag)
350 syslog(LOG_DEBUG, "Create a new thread %p", thread_ctx);
351
352 thread_ctx->sockets = malloc(sizeof(*thread_ctx->sockets)*params->sockets_per_thread_arg);
353 thread_ctx->pollfds = malloc(sizeof(*thread_ctx->pollfds)*params->sockets_per_thread_arg);
354 thread_ctx->nb_cur_sockets = 0;
355 thread_ctx->nb_available_sockets = params->sockets_per_thread_arg;
356 thread_ctx->max_timeout = params->sockets_timeout_arg*1000;
357 thread_ctx->stop = 0;
358 thread_ctx->quiet = params->quiet_flag;
359 if (params->verbose_flag)
360 thread_ctx->quiet = -1;
361 thread_ctx->prev = NULL;
362 pthread_mutex_init(&thread_ctx->mutex, NULL);
363
364 thread_ctx->next = s_last_thread;
365 if (s_last_thread)
366 s_last_thread->prev = thread_ctx;
367 s_last_thread = thread_ctx;
368
369 return thread_ctx;
370}
371
372static void fill_new_socket(struct gengetopt_args_info* params, int socket)
373{
374 thread_ctx_t* thread_ctx;
375 int launch_thread = 0;
376
377 pthread_mutex_lock(&s_fastmutex);
378
379 thread_ctx = s_last_thread;
380 if (!thread_ctx || !thread_ctx->nb_available_sockets)
381 {
382 thread_ctx = create_thread_ctx(params);
383 launch_thread = 1;
384 }
385
386 pthread_mutex_unlock(&s_fastmutex);
387
388 thread_ctx->sockets[thread_ctx->nb_cur_sockets].socket = socket;
389 thread_ctx->sockets[thread_ctx->nb_cur_sockets].timeout = thread_ctx->max_timeout*1000; // ms -> us
390 thread_ctx->sockets[thread_ctx->nb_cur_sockets].nb_remaining_requests = params->client_max_requests_arg;
391
392 pthread_mutex_lock(&thread_ctx->mutex);
393 thread_ctx->nb_cur_sockets++;
394 thread_ctx->nb_available_sockets--;
395 pthread_mutex_unlock(&thread_ctx->mutex);
396
397 if (launch_thread)
398 pthread_create(&thread_ctx->thread, NULL, thread_loop, thread_ctx);
399}
400
401int daemonize(struct gengetopt_args_info* params)
402{
403 int ret;
404 struct sockaddr_in sockaddr;
405 socklen_t sockaddr_len;
406 int new_socket;
407 void* thread_ret;
408
409 // Should have both ipv4 & ipv6
410 s_server_socket = socket(AF_INET, SOCK_STREAM, 0); // Should have both TCP & UDP
411
412 if (!s_server_socket)
413 {
414 if (!params->quiet_flag)
415 fprintf(stderr, "Unable to create socket (%m)\n");
416 return -1;
417 }
418
419 memset(&sockaddr, 0, sizeof(sockaddr));
420 sockaddr.sin_family = AF_INET; // Should detect interface type (v4 or v6)
421 sockaddr.sin_port = htons(params->port_arg);
422 if (params->bind_ip_given)
423 {
424 ret = inet_aton(params->bind_ip_arg, &sockaddr.sin_addr);
425 if (ret)
426 {
427 if (!params->quiet_flag)
428 fprintf(stderr, "Error with bind address %s (%m)\n", params->bind_ip_arg);
429 return -1;
430 }
431 }
432 else
433 sockaddr.sin_addr.s_addr = INADDR_ANY;
434
435 ret = bind(s_server_socket, (struct sockaddr *)&sockaddr, sizeof(sockaddr));
436 if (ret)
437 {
438 if (!params->quiet_flag)
439 fprintf(stderr, "Unable to bind (%m)\n");
440 return -2;
441 }
442
443 ret = listen(s_server_socket, 0);
444 if (ret)
445 {
446 if (!params->quiet_flag)
447 fprintf(stderr, "Unable to listen (%m)\n");
448 return -3;
449 }
450
451 if (!params->no_background_flag)
452 {
453 ret = daemon(0, 0);
454 if (ret)
455 {
456 if (!params->quiet_flag)
457 fprintf(stderr, "Daemon error (%m)\n");
458 return -4;
459 }
460 }
461
462 openlog("ip_to_geod", 0, LOG_DAEMON);
463
464 syslog(LOG_INFO, "ip_togeod started\n");
465
466 signal(SIGINT, sigint);
467 signal(SIGUSR1, sigint);
468 signal(SIGUSR2, sigint);
469
470#ifdef USE_SECCOMP
471 scmp_filter_ctx seccomp_ctx = seccomp_init(SCMP_ACT_KILL/*SCMP_ACT_TRAP*/);
472
473 if (seccomp_ctx == NULL)
474 {
475 syslog(LOG_ERR, "unable to initialize seccomp\n");
476 return -5;
477 }
478
479 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(poll), 0);
480 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(read), 0);
481 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(write), 0);
482 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(close), 0);
483 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(shutdown), 0);
484 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(accept), 0);
485 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(syncfs), 0);
486 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(nanosleep), 0);
487 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(restart_syscall), 0); // for usleep
488 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(rt_sigreturn), 0); // for signal handler
489 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(syslog), 0);
490 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(sendto), 0); // For syslog
491 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(open), 1, SCMP_A1(SCMP_CMP_EQ , O_RDONLY|O_CLOEXEC));
492 // For threads
493 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(mmap), 0);
494 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(munmap), 0);
495 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(mprotect), 0);
496 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(clone), 0);
497 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(fstat), 0);
498 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(access), 0);
499 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(exit), 0);
500
501 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(set_robust_list), 0);
502 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(madvise), 0);
503 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(munlock), 0);
504 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(futex), 0);
505
506 ret = seccomp_load(seccomp_ctx);
507 if (ret < 0)
508 {
509 syslog(LOG_ERR, "Error seccomp load %d\n", ret);
510 return -6;
511 }
512#endif
513
514 while (!s_stop)
515 {
516 sockaddr_len = sizeof(sockaddr);
517 new_socket = accept(s_server_socket, (struct sockaddr *) &sockaddr, &sockaddr_len);
518 if (new_socket < 0)
519 {
520 if (!s_stop)
521 syslog(LOG_ERR, "accept error (%m), exiting");
522 break;
523 }
524 if (!params->quiet_flag)
525 syslog(LOG_INFO, "new connection from %s, socket %d",
526 inet_ntoa(sockaddr.sin_addr), new_socket);
527 fill_new_socket(params, new_socket);
528 }
529
530 close(s_server_socket);
531
532 while (s_last_thread)
533 {
534 s_last_thread->stop = 1;
535 pthread_join(s_last_thread->thread, &thread_ret);
536 }
537
538 closelog();
539
540#ifdef USE_SECCOMP
541 if (seccomp_ctx)
542 seccomp_release(seccomp_ctx);
543#endif
544
545 return 0;
546}
547

Archive Download this file

Branches

Tags