IP to Geo

IP to Geo Git Source Tree

Root/src/server.c

1/*
2 Copyright 2016 Grégory Soutadé
3
4 This file is part of iptogeo.
5
6 iptogeo is free software: you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation, either version 3 of the License, or
9 (at your option) any later version.
10
11 iptogeo is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with iptogeo. If not, see <http://www.gnu.org/licenses/>.
18*/
19
20#define _GNU_SOURCE 1 // for POLLRDHUP && syncfs
21#include <sys/types.h>
22#include <sys/socket.h>
23#include <sys/select.h>
24#include <sys/time.h>
25#include <arpa/inet.h>
26#include <netinet/in.h>
27#include <time.h>
28#include <syslog.h>
29#include <signal.h>
30#include <stdio.h>
31#include <unistd.h>
32#include <pthread.h>
33#include <stdlib.h>
34#include <string.h>
35#include <poll.h>
36
37#ifdef USE_SECCOMP
38#include <seccomp.h>
39#endif
40
41#include "ip_to_geo.h"
42#include "protocol.h"
43
44#define WAIT_TIME 100
45#define MAX_WAIT_TIME 500
46
47typedef struct {
48 int socket;
49 time_t timeout; // in µs
50 int nb_remaining_requests;
51} socket_ctx_t;
52
53typedef struct thread_ctx_s{
54 struct thread_ctx_s* prev;
55 struct thread_ctx_s* next;
56 pthread_t thread;
57 socket_ctx_t* sockets;
58 int nb_cur_sockets;
59 int nb_available_sockets;
60 int max_timeout;
61 int max_sockets; // in ms
62 int stop;
63 int quiet;
64 pthread_mutex_t mutex;
65 struct pollfd * pollfds;
66} thread_ctx_t;
67
68static pthread_mutex_t s_fastmutex = PTHREAD_MUTEX_INITIALIZER;
69static thread_ctx_t* s_last_thread = NULL;
70static int s_server_socket = -1;
71static int s_stop = 0;
72
73void sigint(int sig)
74{
75 syslog(LOG_WARNING, "signal received, stopping threads");
76 s_stop = 1;
77 shutdown(s_server_socket, SHUT_RDWR);
78}
79
80static int check_request(request_t* req)
81{
82 if (req->magic != REQ_MAGIC)
83 return REQ_ERR_BAD_MAGIC;
84
85 if (req->version != REQ_VERSION)
86 return REQ_ERR_BAD_VERSION;
87
88 if (req->ip_type != REQ_IPV4 &&
89 req->ip_type != REQ_IPV6)
90 return REQ_ERR_BAD_IP_VERSION;
91
92 if (req->ip_type != REQ_IPV4)
93 return REQ_ERR_UNSUPPORTED_IP_VERSION;
94
95 if (req->req != REQ_REQ)
96 return REQ_ERR_BAD_REQ_FIELD;
97
98 return REQ_ERR_NO_ERR;
99}
100
101static int handle_request(thread_ctx_t* thread_ctx, int socket)
102{
103 request_t req;
104 const uint8_t* geo;
105 int sent=0;
106 int ret = read(socket, &req, sizeof(req));
107
108 // Socket closed
109 if (ret == 0)
110 {
111 if (thread_ctx->quiet < 0)
112 syslog(LOG_DEBUG, "Socket %d closed", socket);
113 return 1;
114 }
115
116 if (thread_ctx->quiet < 0)
117 syslog(LOG_DEBUG, "New request");
118
119 if (ret != sizeof(req))
120 {
121 if (thread_ctx->quiet < 0)
122 syslog(LOG_DEBUG, "Invalid request size %d", ret);
123
124 return -1;
125 }
126
127 ret = check_request(&req);
128 req.req = REQ_RESP;
129
130 if (ret)
131 {
132 req.err = ret;
133 if (thread_ctx->quiet < 0)
134 syslog(LOG_DEBUG, "Request error %d", ret);
135 }
136 else
137 {
138 if (thread_ctx->quiet < 0)
139 {
140 char dst[64];
141 inet_ntop((req.ip_type == REQ_IPV4)?AF_INET:AF_INET6, req.ip, dst, sizeof(dst));
142 syslog(LOG_DEBUG, "Request for %s from socket %d", dst, socket);
143 }
144
145 geo = ip_to_geo(req.ip, req.ip_type);
146 if (!geo)
147 {
148 req.err = REQ_IP_NOT_FOUND;
149 if (thread_ctx->quiet < 0)
150 syslog(LOG_DEBUG, "Not found");
151 }
152 else
153 {
154 req.err = REQ_ERR_NO_ERR;
155 geo = get_country_code(geo);
156 if (thread_ctx->quiet < 0)
157 syslog(LOG_DEBUG, "Res %s", geo);
158 req.country_code[0] = geo[0];
159 req.country_code[1] = geo[1];
160 req.country_code[2] = 0;
161 req.country_code[3] = 0;
162 }
163 }
164
165 for (sent=0; sent < sizeof(req); sent += ret)
166 {
167 ret = write(socket, &((uint8_t*)&req)[sent], sizeof(req)-sent);
168 if (ret < 0)
169 return -1;
170 }
171
172 return 0;
173}
174
175static void delete_thread(thread_ctx_t* thread_ctx)
176{
177 int i;
178
179 pthread_mutex_lock(&s_fastmutex);
180 thread_ctx->nb_available_sockets = 0;
181
182 if (thread_ctx->quiet < 0)
183 syslog(LOG_DEBUG, "Delete thread %p", thread_ctx);
184
185 for(i=0; i<thread_ctx->nb_cur_sockets; i++)
186 {
187 if (thread_ctx->sockets[i].timeout > 0)
188 {
189 close (thread_ctx->sockets[i].socket);
190 }
191 }
192
193 free(thread_ctx->sockets);
194 free(thread_ctx->pollfds);
195
196 if (thread_ctx->next)
197 thread_ctx->next->prev = thread_ctx->prev;
198 if (thread_ctx->prev)
199 thread_ctx->prev->next = thread_ctx->next;
200
201 if (thread_ctx == s_last_thread)
202 s_last_thread = thread_ctx->next;
203 pthread_mutex_unlock(&s_fastmutex);
204
205 free(thread_ctx);
206}
207
208static inline void close_socket(socket_ctx_t* socket)
209{
210 socket->timeout = -1;
211 close(socket->socket);
212}
213
214#define POLL_ERR_MASK (POLLRDHUP|POLLERR|POLLHUP|POLLNVAL)
215
216static void* thread_loop(void* param)
217{
218 thread_ctx_t* ctx = (thread_ctx_t*)param;
219 int i, ret, nfds, nb_cur_sockets, nb_available_sockets, poll_idx;
220 struct timeval time1, time2, time_res;
221 int wait_time = WAIT_TIME;
222
223 while (!ctx->stop)
224 {
225 nfds = 0;
226
227 pthread_mutex_lock(&ctx->mutex);
228 nb_cur_sockets = ctx->nb_cur_sockets;
229 nb_available_sockets = ctx->nb_available_sockets;
230 pthread_mutex_unlock(&ctx->mutex);
231
232 for(i=0; i<nb_cur_sockets; i++)
233 {
234 if (ctx->sockets[i].timeout > 0)
235 {
236 ctx->pollfds[nfds].fd = ctx->sockets[i].socket;
237 ctx->pollfds[nfds].events = POLLIN|POLL_ERR_MASK;
238 nfds++;
239 }
240 }
241
242 if (!nfds)
243 {
244 /*
245 No more active socket for this thread
246 nor available slots
247 */
248 if (!nb_available_sockets)
249 break;
250
251 if (wait_time < MAX_WAIT_TIME)
252 wait_time += WAIT_TIME;
253
254 usleep(wait_time);
255 continue;
256 }
257 else
258 wait_time = WAIT_TIME;
259
260 gettimeofday(&time1, NULL);
261 ret = poll(ctx->pollfds, nfds, ctx->max_timeout);
262 gettimeofday(&time2, NULL);
263
264 // Timeout, remove all current sockets
265 if (ret == 0)
266 {
267 if (ctx->quiet < 0)
268 syslog(LOG_DEBUG, "Timeout");
269
270 for(i=0; i<nb_cur_sockets; i++)
271 {
272 if (ctx->sockets[i].timeout > 0)
273 close_socket(&ctx->sockets[i]);
274 }
275 }
276 else if (ret < 0)
277 {
278 if (!s_stop && !ctx->stop)
279 syslog(LOG_WARNING, "poll has errors (%m)\n");
280 break;
281 }
282 else
283 {
284 timersub(&time2, &time1, &time_res);
285 poll_idx = -1;
286 for(i=0; i<nb_cur_sockets; i++)
287 {
288 if (ctx->sockets[i].timeout < 0) continue;
289 poll_idx++;
290 if (ctx->pollfds[poll_idx].fd != ctx->sockets[i].socket)
291 {
292 if (ctx->quiet < 0)
293 syslog(LOG_ERR, "Socket not found but present in poll fds");
294 continue;
295 }
296
297 // Error
298 if (ctx->pollfds[poll_idx].revents & POLL_ERR_MASK)
299 {
300 if (ctx->quiet < 0)
301 syslog(LOG_ERR, "Error (or closed) socket %d", ctx->sockets[i].socket);
302 close_socket(&ctx->sockets[i]);
303 }
304 // Someone is speaking
305 else if (ctx->pollfds[poll_idx].revents & POLLIN)
306 {
307 ctx->sockets[i].timeout = ctx->max_timeout*1000;
308 ret = handle_request(ctx, ctx->sockets[i].socket);
309 if (ret == 1)
310 {
311 if (ctx->quiet < 0)
312 syslog(LOG_DEBUG, "Client has closed socket %d",
313 ctx->sockets[i].socket);
314 close_socket(&ctx->sockets[i]);
315 }
316 // No more requests accepted
317 else if (!ctx->sockets[i].nb_remaining_requests--)
318 {
319 if (ctx->quiet < 0)
320 syslog(LOG_DEBUG, "Max requests reached for socket %d",
321 ctx->sockets[i].socket);
322 syncfs(ctx->sockets[i].socket);
323 close_socket(&ctx->sockets[i]);
324 }
325 }
326 else
327 {
328 ctx->sockets[i].timeout -= (time_res.tv_sec*1000000 + time_res.tv_usec);
329 if (ctx->sockets[i].timeout <= 0)
330 close_socket(&ctx->sockets[i]);
331 }
332 }
333 }
334 };
335
336 delete_thread(ctx);
337
338 pthread_exit(NULL);
339
340 return NULL;
341}
342
343static inline thread_ctx_t* create_thread_ctx(struct gengetopt_args_info* params)
344{
345 thread_ctx_t* thread_ctx = malloc(sizeof(*thread_ctx));
346
347 if (params->verbose_flag)
348 syslog(LOG_DEBUG, "Create a new thread %p", thread_ctx);
349
350 thread_ctx->sockets = malloc(sizeof(*thread_ctx->sockets)*params->sockets_per_thread_arg);
351 thread_ctx->pollfds = malloc(sizeof(*thread_ctx->pollfds)*params->sockets_per_thread_arg);
352 thread_ctx->nb_cur_sockets = 0;
353 thread_ctx->nb_available_sockets = params->sockets_per_thread_arg;
354 thread_ctx->max_timeout = params->sockets_timeout_arg*1000;
355 thread_ctx->stop = 0;
356 thread_ctx->quiet = params->quiet_flag;
357 if (params->verbose_flag)
358 thread_ctx->quiet = -1;
359 thread_ctx->prev = NULL;
360 pthread_mutex_init(&thread_ctx->mutex, NULL);
361
362 thread_ctx->next = s_last_thread;
363 if (s_last_thread)
364 s_last_thread->prev = thread_ctx;
365 s_last_thread = thread_ctx;
366
367 return thread_ctx;
368}
369
370static void fill_new_socket(struct gengetopt_args_info* params, int socket)
371{
372 thread_ctx_t* thread_ctx;
373 int launch_thread = 0;
374
375 pthread_mutex_lock(&s_fastmutex);
376
377 thread_ctx = s_last_thread;
378 if (!thread_ctx || !thread_ctx->nb_available_sockets)
379 {
380 thread_ctx = create_thread_ctx(params);
381 launch_thread = 1;
382 }
383
384 pthread_mutex_unlock(&s_fastmutex);
385
386 thread_ctx->sockets[thread_ctx->nb_cur_sockets].socket = socket;
387 thread_ctx->sockets[thread_ctx->nb_cur_sockets].timeout = thread_ctx->max_timeout*1000; // ms -> us
388 thread_ctx->sockets[thread_ctx->nb_cur_sockets].nb_remaining_requests = params->client_max_requests_arg;
389
390 pthread_mutex_lock(&thread_ctx->mutex);
391 thread_ctx->nb_cur_sockets++;
392 thread_ctx->nb_available_sockets--;
393 pthread_mutex_unlock(&thread_ctx->mutex);
394
395 if (launch_thread)
396 pthread_create(&thread_ctx->thread, NULL, thread_loop, thread_ctx);
397}
398
399int daemonize(struct gengetopt_args_info* params)
400{
401 int ret;
402 struct sockaddr_in sockaddr;
403 socklen_t sockaddr_len;
404 int new_socket;
405 void* thread_ret;
406
407 // Should have both ipv4 & ipv6
408 s_server_socket = socket(AF_INET, SOCK_STREAM, 0); // Should have both TCP & UDP
409
410 if (!s_server_socket)
411 {
412 if (!params->quiet_flag)
413 fprintf(stderr, "Unable to create socket (%m)\n");
414 return -1;
415 }
416
417 memset(&sockaddr, 0, sizeof(sockaddr));
418 sockaddr.sin_family = AF_INET; // Should detect interface type (v4 or v6)
419 sockaddr.sin_port = htons(params->port_arg);
420 if (params->bind_ip_given)
421 {
422 ret = inet_aton(params->bind_ip_arg, &sockaddr.sin_addr);
423 if (ret)
424 {
425 if (!params->quiet_flag)
426 fprintf(stderr, "Error with bind address %s (%m)\n", params->bind_ip_arg);
427 return -1;
428 }
429 }
430 else
431 sockaddr.sin_addr.s_addr = INADDR_ANY;
432
433 ret = bind(s_server_socket, (struct sockaddr *)&sockaddr, sizeof(sockaddr));
434 if (ret)
435 {
436 if (!params->quiet_flag)
437 fprintf(stderr, "Unable to bind (%m)\n");
438 return -2;
439 }
440
441 ret = listen(s_server_socket, 0);
442 if (ret)
443 {
444 if (!params->quiet_flag)
445 fprintf(stderr, "Unable to listen (%m)\n");
446 return -3;
447 }
448
449 if (!params->no_background_flag)
450 {
451 ret = daemon(0, 0);
452 if (ret)
453 {
454 if (!params->quiet_flag)
455 fprintf(stderr, "Daemon error (%m)\n");
456 return -4;
457 }
458 }
459
460 openlog("ip_to_geod", 0, LOG_DAEMON);
461
462 syslog(LOG_INFO, "ip_togeod started\n");
463
464 signal(SIGINT, sigint);
465 signal(SIGUSR1, sigint);
466 signal(SIGUSR2, sigint);
467
468#ifdef USE_SECCOMP
469 scmp_filter_ctx seccomp_ctx = seccomp_init(SCMP_ACT_KILL);
470
471 if (seccomp_ctx == NULL)
472 {
473 syslog(LOG_ERR, "unable to initialize seccomp\n");
474 return -5;
475 }
476
477 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(read), 0);
478 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(write), 0);
479 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(close), 0);
480 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(accept), 0);
481#endif
482
483 while (!s_stop)
484 {
485 sockaddr_len = sizeof(sockaddr);
486 new_socket = accept(s_server_socket, (struct sockaddr *) &sockaddr, &sockaddr_len);
487 if (new_socket < 0)
488 {
489 if (!s_stop)
490 syslog(LOG_ERR, "accept error (%m), exiting");
491 break;
492 }
493 if (!params->quiet_flag)
494 syslog(LOG_INFO, "new connection from %s, socket %d",
495 inet_ntoa(sockaddr.sin_addr), new_socket);
496 fill_new_socket(params, new_socket);
497 }
498
499 close(s_server_socket);
500
501 while (s_last_thread)
502 {
503 s_last_thread->stop = 1;
504 pthread_join(s_last_thread->thread, &thread_ret);
505 }
506
507 closelog();
508
509#ifdef USE_SECCOMP
510 if (seccomp_ctx)
511 seccomp_release(seccomp_ctx);
512#endif
513
514 return 0;
515}
516

Archive Download this file

Branches

Tags