IP to Geo

IP to Geo Git Source Tree

Root/src/server.c

1#define _GNU_SOURCE 1 // for POLLRDHUP && syncfs
2#include <sys/types.h>
3#include <sys/socket.h>
4#include <sys/select.h>
5#include <sys/time.h>
6#include <arpa/inet.h>
7#include <netinet/in.h>
8#include <time.h>
9#include <syslog.h>
10#include <signal.h>
11#include <stdio.h>
12#include <unistd.h>
13#include <pthread.h>
14#include <stdlib.h>
15#include <string.h>
16#include <poll.h>
17
18#ifdef USE_SECCOMP
19#include <seccomp.h>
20#endif
21
22#include "ip_to_geo.h"
23#include "protocol.h"
24
25#define WAIT_TIME 100
26#define MAX_WAIT_TIME 500
27
28typedef struct {
29 int socket;
30 time_t timeout;
31 int nb_remaining_requests;
32} socket_ctx_t;
33
34typedef struct thread_ctx_s{
35 struct thread_ctx_s* prev;
36 struct thread_ctx_s* next;
37 pthread_t thread;
38 socket_ctx_t* sockets;
39 int nb_cur_sockets;
40 int nb_available_sockets;
41 int max_timeout;
42 int max_sockets;
43 int stop;
44 int quiet;
45 pthread_mutex_t mutex;
46 struct pollfd * pollfds;
47} thread_ctx_t;
48
49static pthread_mutex_t s_fastmutex = PTHREAD_MUTEX_INITIALIZER;
50static thread_ctx_t* s_last_thread = NULL;
51static int s_server_socket = -1;
52static int s_stop = 0;
53
54void sigint(int sig)
55{
56 syslog(LOG_WARNING, "signal received, stopping threads");
57 s_stop = 1;
58 shutdown(s_server_socket, SHUT_RDWR);
59}
60
61static int check_request(request_t* req)
62{
63 if (req->magic != REQ_MAGIC)
64 return REQ_ERR_BAD_MAGIC;
65
66 if (req->version != REQ_VERSION)
67 return REQ_ERR_BAD_VERSION;
68
69 if (req->ip_type != REQ_IPV4 &&
70 req->ip_type != REQ_IPV6)
71 return REQ_ERR_BAD_IP_VERSION;
72
73 if (req->ip_type != REQ_IPV4)
74 return REQ_ERR_UNSUPPORTED_IP_VERSION;
75
76 if (req->req != REQ_REQ)
77 return REQ_ERR_BAD_REQ_FIELD;
78
79 return REQ_ERR_NO_ERR;
80}
81
82static int handle_request(thread_ctx_t* thread_ctx, int socket)
83{
84 request_t req;
85 const uint8_t* geo;
86 int sent=0;
87 int ret = read(socket, &req, sizeof(req));
88
89 // Socket closed
90 if (ret == 0)
91 {
92 if (thread_ctx->quiet < 0)
93 syslog(LOG_DEBUG, "Socket %d closed", socket);
94 return 1;
95 }
96
97 if (thread_ctx->quiet < 0)
98 syslog(LOG_DEBUG, "New request");
99
100 if (ret != sizeof(req))
101 {
102 if (thread_ctx->quiet < 0)
103 syslog(LOG_DEBUG, "Invalid request size %d", ret);
104
105 return -1;
106 }
107
108 ret = check_request(&req);
109 req.req = REQ_RESP;
110
111 if (ret)
112 {
113 req.err = ret;
114 if (thread_ctx->quiet < 0)
115 syslog(LOG_DEBUG, "Request error %d", ret);
116 }
117 else
118 {
119 if (thread_ctx->quiet < 0)
120 syslog(LOG_DEBUG, "Request for %08x from socket %d", req.ipv4, socket);
121 geo = ip_to_geo(req.ipv4);
122 if (!geo)
123 {
124 req.err = REQ_IP_NOT_FOUND;
125 if (thread_ctx->quiet < 0)
126 syslog(LOG_DEBUG, "Not found");
127 }
128 else
129 {
130 req.err = REQ_ERR_NO_ERR;
131 geo = get_country_code(geo);
132 if (thread_ctx->quiet < 0)
133 syslog(LOG_DEBUG, "Res %s", geo);
134 req.country_code[0] = geo[0];
135 req.country_code[1] = geo[1];
136 req.country_code[2] = 0;
137 req.country_code[3] = 0;
138 }
139 }
140
141 for (sent=0; sent < sizeof(req); sent += ret)
142 {
143 ret = write(socket, &((uint8_t*)&req)[sent], sizeof(req)-sent);
144 if (ret < 0)
145 return -1;
146 }
147
148 return 0;
149}
150
151static void delete_thread(thread_ctx_t* thread_ctx)
152{
153 int i;
154
155 pthread_mutex_lock(&s_fastmutex);
156 thread_ctx->nb_available_sockets = 0;
157
158 if (thread_ctx->quiet < 0)
159 syslog(LOG_DEBUG, "Delete thread %p", thread_ctx);
160
161 for(i=0; i<thread_ctx->nb_cur_sockets; i++)
162 {
163 if (thread_ctx->sockets[i].timeout > 0)
164 {
165 close (thread_ctx->sockets[i].socket);
166 }
167 }
168
169 free(thread_ctx->sockets);
170 free(thread_ctx->pollfds);
171
172 if (thread_ctx->next)
173 thread_ctx->next->prev = thread_ctx->prev;
174 if (thread_ctx->prev)
175 thread_ctx->prev->next = thread_ctx->next;
176
177 if (thread_ctx == s_last_thread)
178 s_last_thread = thread_ctx->next;
179 pthread_mutex_unlock(&s_fastmutex);
180
181 free(thread_ctx);
182}
183
184static inline void close_socket(socket_ctx_t* socket)
185{
186 socket->timeout = -1;
187 close(socket->socket);
188}
189
190#define POLL_ERR_MASK (POLLRDHUP|POLLERR|POLLHUP|POLLNVAL)
191
192static void* thread_loop(void* param)
193{
194 thread_ctx_t* ctx = (thread_ctx_t*)param;
195 int i, ret, nfds, nb_cur_sockets, nb_available_sockets, poll_idx;
196 struct timeval time1, time2, time_res;
197 int wait_time = WAIT_TIME;
198
199 while (!ctx->stop)
200 {
201 nfds = 0;
202
203 pthread_mutex_lock(&ctx->mutex);
204 nb_cur_sockets = ctx->nb_cur_sockets;
205 nb_available_sockets = ctx->nb_available_sockets;
206 pthread_mutex_unlock(&ctx->mutex);
207
208 for(i=0; i<nb_cur_sockets; i++)
209 {
210 if (ctx->sockets[i].timeout > 0)
211 {
212 ctx->pollfds[nfds].fd = ctx->sockets[i].socket;
213 ctx->pollfds[nfds].events = POLLIN|POLL_ERR_MASK;
214 nfds++;
215 }
216 }
217
218 if (!nfds)
219 {
220 /*
221 No more active socket for this thread
222 nor available slots
223 */
224 if (!nb_available_sockets)
225 break;
226
227 if (wait_time < MAX_WAIT_TIME)
228 wait_time += WAIT_TIME;
229
230 usleep(wait_time);
231 continue;
232 }
233 else
234 wait_time = WAIT_TIME;
235
236 gettimeofday(&time1, NULL);
237 ret = poll(ctx->pollfds, nfds, ctx->max_timeout);
238 gettimeofday(&time2, NULL);
239
240 // Timeout, remove all current sockets
241 if (ret == 0)
242 {
243 if (ctx->quiet < 0)
244 syslog(LOG_DEBUG, "Timeout");
245
246 for(i=0; i<nb_cur_sockets; i++)
247 {
248 if (ctx->sockets[i].timeout > 0)
249 close_socket(&ctx->sockets[i]);
250 }
251 }
252 else if (ret < 0)
253 {
254 if (!s_stop && !ctx->stop)
255 syslog(LOG_WARNING, "poll has errors (%m)\n");
256 break;
257 }
258 else
259 {
260 timersub(&time2, &time1, &time_res);
261 poll_idx = -1;
262 for(i=0; i<ctx->nb_cur_sockets; i++)
263 {
264 if (ctx->sockets[i].timeout < 0) continue;
265 poll_idx++;
266 if (ctx->pollfds[poll_idx].fd != ctx->sockets[i].socket)
267 {
268 if (ctx->quiet < 0)
269 syslog(LOG_ERR, "Socket not found but present in poll fds");
270 continue;
271 }
272
273 // Error
274 if (ctx->pollfds[poll_idx].revents & POLL_ERR_MASK)
275 {
276 if (ctx->quiet < 0)
277 syslog(LOG_ERR, "Error with socket %d", ctx->sockets[i].socket);
278 close_socket(&ctx->sockets[i]);
279 }
280 // Someone is speaking
281 else if (ctx->pollfds[poll_idx].revents & POLLIN)
282 {
283 ctx->sockets[i].timeout = ctx->max_timeout*1000;
284 ret = handle_request(ctx, ctx->sockets[i].socket);
285 if (ret == 1)
286 {
287 if (ctx->quiet < 0)
288 syslog(LOG_DEBUG, "Client has closed socket %d",
289 ctx->sockets[i].socket);
290 close_socket(&ctx->sockets[i]);
291 }
292 // No more requests accepted
293 else if (!ctx->sockets[i].nb_remaining_requests--)
294 {
295 if (ctx->quiet < 0)
296 syslog(LOG_DEBUG, "Max requests reached for socket %d",
297 ctx->sockets[i].socket);
298 syncfs(ctx->sockets[i].socket);
299 close_socket(&ctx->sockets[i]);
300 }
301 }
302 else
303 {
304 ctx->sockets[i].timeout -= (time_res.tv_sec*1000000 + time_res.tv_usec);
305 if (ctx->sockets[i].timeout <= 0)
306 close_socket(&ctx->sockets[i]);
307 }
308 }
309 }
310 };
311
312 delete_thread(ctx);
313
314 pthread_exit(NULL);
315
316 return NULL;
317}
318
319static inline thread_ctx_t* create_thread_ctx(struct gengetopt_args_info* params)
320{
321 thread_ctx_t* thread_ctx = malloc(sizeof(*thread_ctx));
322
323 if (params->verbose_flag)
324 syslog(LOG_DEBUG, "Create a new thread %p", thread_ctx);
325
326 thread_ctx->sockets = malloc(sizeof(*thread_ctx->sockets)*params->sockets_per_thread_arg);
327 thread_ctx->pollfds = malloc(sizeof(*thread_ctx->pollfds)*params->sockets_per_thread_arg);
328 thread_ctx->nb_cur_sockets = 0;
329 thread_ctx->nb_available_sockets = params->sockets_per_thread_arg;
330 thread_ctx->max_timeout = params->sockets_timeout_arg*1000;
331 thread_ctx->stop = 0;
332 thread_ctx->quiet = params->quiet_flag;
333 if (params->verbose_flag)
334 thread_ctx->quiet = -1;
335 thread_ctx->prev = NULL;
336 pthread_mutex_init(&thread_ctx->mutex, NULL);
337
338 thread_ctx->next = s_last_thread;
339 if (s_last_thread)
340 s_last_thread->prev = thread_ctx;
341 else
342 s_last_thread = thread_ctx;
343
344 return thread_ctx;
345}
346
347static void fill_new_socket(struct gengetopt_args_info* params, int socket)
348{
349 thread_ctx_t* thread_ctx;
350 int launch_thread = 0;
351
352 pthread_mutex_lock(&s_fastmutex);
353
354 thread_ctx = s_last_thread;
355 if (!thread_ctx || !thread_ctx->nb_available_sockets)
356 {
357 thread_ctx = create_thread_ctx(params);
358 launch_thread = 1;
359 }
360
361 pthread_mutex_unlock(&s_fastmutex);
362
363 thread_ctx->sockets[thread_ctx->nb_cur_sockets].socket = socket;
364 thread_ctx->sockets[thread_ctx->nb_cur_sockets].timeout = thread_ctx->max_timeout*1000; // ms -> us
365 thread_ctx->sockets[thread_ctx->nb_cur_sockets].nb_remaining_requests = params->client_max_requests_arg;
366
367 pthread_mutex_lock(&thread_ctx->mutex);
368 thread_ctx->nb_cur_sockets++;
369 thread_ctx->nb_available_sockets--;
370 pthread_mutex_unlock(&thread_ctx->mutex);
371
372 if (launch_thread)
373 pthread_create(&thread_ctx->thread, NULL, thread_loop, thread_ctx);
374}
375
376int daemonize(struct gengetopt_args_info* params)
377{
378 int ret;
379 struct sockaddr_in sockaddr;
380 socklen_t sockaddr_len;
381 int new_socket;
382 void* thread_ret;
383
384 // Should have both ipv4 & ipv6
385 s_server_socket = socket(AF_INET, SOCK_STREAM, 0); // Should have both TCP & UDP
386
387 if (!s_server_socket)
388 {
389 if (!params->quiet_flag)
390 fprintf(stderr, "Unable to create socket (%m)\n");
391 return -1;
392 }
393
394 memset(&sockaddr, 0, sizeof(sockaddr));
395 sockaddr.sin_family = AF_INET; // Should detect interface type (v4 or v6)
396 sockaddr.sin_port = htons(params->port_arg);
397 if (params->bind_ip_given)
398 {
399 ret = inet_aton(params->bind_ip_arg, &sockaddr.sin_addr);
400 if (ret)
401 {
402 if (!params->quiet_flag)
403 fprintf(stderr, "Error with bind address %s (%m)\n", params->bind_ip_arg);
404 return -1;
405 }
406 }
407 else
408 sockaddr.sin_addr.s_addr = INADDR_ANY;
409
410 ret = bind(s_server_socket, (struct sockaddr *)&sockaddr, sizeof(sockaddr));
411 if (ret)
412 {
413 if (!params->quiet_flag)
414 fprintf(stderr, "Unable to bind (%m)\n");
415 return -2;
416 }
417
418 ret = listen(s_server_socket, 0);
419 if (ret)
420 {
421 if (!params->quiet_flag)
422 fprintf(stderr, "Unable to listen (%m)\n");
423 return -3;
424 }
425
426 if (!params->no_background_flag)
427 {
428 ret = daemon(0, 0);
429 if (ret)
430 {
431 if (!params->quiet_flag)
432 fprintf(stderr, "Daemon error (%m)\n");
433 return -4;
434 }
435 }
436
437 openlog("ip_to_geod", 0, LOG_DAEMON);
438
439 syslog(LOG_INFO, "ip_togeod started\n");
440
441 signal(SIGINT, sigint);
442 signal(SIGUSR1, sigint);
443 signal(SIGUSR2, sigint);
444
445#ifdef USE_SECCOMP
446 scmp_filter_ctx seccomp_ctx = seccomp_init(SCMP_ACT_KILL);
447
448 if (seccomp_ctx == NULL)
449 {
450 syslog(LOG_ERR, "unable to initialize seccomp\n");
451 return -5;
452 }
453
454 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(read), 0);
455 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(write), 0);
456 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(close), 0);
457 seccomp_rule_add(seccomp_ctx, SCMP_ACT_ALLOW, SCMP_SYS(accept), 0);
458#endif
459
460 while (!s_stop)
461 {
462 sockaddr_len = sizeof(sockaddr);
463 new_socket = accept(s_server_socket, (struct sockaddr *) &sockaddr, &sockaddr_len);
464 if (new_socket < 0)
465 {
466 if (!s_stop)
467 syslog(LOG_ERR, "accept error (%m), exiting");
468 break;
469 }
470 if (!params->quiet_flag)
471 syslog(LOG_INFO, "new connection from %s, socket %d",
472 inet_ntoa(sockaddr.sin_addr), new_socket);
473 fill_new_socket(params, new_socket);
474 }
475
476 close(s_server_socket);
477
478 while (s_last_thread)
479 {
480 s_last_thread->stop = 1;
481 pthread_join(s_last_thread->thread, &thread_ret);
482 }
483
484 closelog();
485
486#ifdef USE_SECCOMP
487 if (seccomp_ctx)
488 seccomp_release(seccomp_ctx);
489#endif
490
491 return 0;
492}
493

Archive Download this file

Branches

Tags