IP to Geo

IP to Geo Git Source Tree

Root/tests/iptogeo.py

1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4#
5# Copyright 2016 Grégory Soutadé
6#
7# This file is part of iptogeo.
8#
9# iptogeo is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# iptogeo is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with iptogeo. If not, see <http://www.gnu.org/licenses/>.
21#
22
23import socket
24import struct
25
26class IPToGeoException(Exception):
27 pass
28
29class IPToGeo(object):
30
31 MAGIC = 0x179E08EF
32 VERSION = 1
33 REQ = 1
34 RESP = 0
35 IPV4 = 4
36 IPV6 = 16
37
38 IP_NOT_FOUND = 6
39
40 PACKET_SIZE = 32
41
42 ERRORS = {1 : 'Bad magic',
43 2 : 'Bad version',
44 3 : 'Bad request field' ,
45 4 : 'Bad IP version',
46 5 : 'Unsupported IP version',
47 6 : 'IP not found'}
48
49 MAX_REQUESTS = 50
50
51 def __init__(self, remote_addr='127.0.0.1', remote_port=53333, timeout=None, family=socket.AF_INET):
52 self._remote_addr = remote_addr
53 self._remote_port = remote_port
54 self._timeout = timeout
55 self._family = family
56 self._nb_requests_sent = self.MAX_REQUESTS # Force socket creation
57 self._socket = None
58
59 def _create_socket(self):
60 if self._socket:
61 self._socket.close()
62 self._socket = socket.socket(self._family, socket.SOCK_STREAM)
63 if not self._timeout is None:
64 self._socket.settimeout(self._timeout)
65 self._socket.connect((self._remote_addr, self._remote_port))
66
67 def _extend_ipv6(self, ipv6):
68 tmp = ''
69 for s in ipv6.split(':'):
70 if not s: break
71 while len(s) != 4:
72 s = '0' + s
73 tmp += s
74 while len(tmp) < 16*2:
75 tmp += '0'
76 res = ''
77 for i in range(0, 15*2, 2):
78 res += tmp[i] + tmp[i+1] + ':'
79 res += tmp[30] + tmp[31]
80
81 return res
82
83 def _create_request(self, ip, ip_type):
84 packet = ''
85 packet += struct.pack('<IBBBBI', IPToGeo.MAGIC, IPToGeo.VERSION, IPToGeo.REQ,
86 0, #err
87 ip_type, # ip type
88 0) # flags
89 for i in ip:
90 packet += struct.pack('<B', i) # ipv4
91 packet += struct.pack('<III', 0, 0, 0) # ipv6
92 packet += struct.pack('<I', 0) # country code
93
94 return packet
95
96 def _check_request(self, packet):
97 (magic, version, req, err, ip_type, flags, ipv4, ipv6b, ipv6c, ipv6d) = struct.unpack_from('<IBBBBIIIII', packet, 0)
98
99 if magic != IPToGeo.MAGIC:
100 raise IPToGeoException('Invalid magic %08x' % (magic))
101
102 ip_res = '%08x' % (ipv4)
103 if ip_type == IPToGeo.IPV6:
104 ip_res += '%08x' % (ipv6b)
105 ip_res += '%08x' % (ipv6c)
106 ip_res += '%08x' % (ipv6d)
107
108 if err == IPToGeo.IP_NOT_FOUND: return (ip_res, None) # IP not found
109 if err != 0:
110 raise IPToGeoException(IPToGeo.ERRORS[err])
111
112 (cc0, cc1, cc2, cc3) = struct.unpack_from('BBBB', packet, 7*4)
113
114 return (ip_res, '%c%c%c%c' % (cc0, cc1, cc2, cc3))
115
116 def _send_request(self, packet, second_chance=True):
117 self._nb_requests_sent += 1
118 if self._nb_requests_sent >= self.MAX_REQUESTS:
119 self._create_socket()
120 self._nb_requests_sent = 0
121 try:
122 self._socket.send(packet)
123 packet = self._socket.recv(IPToGeo.PACKET_SIZE)
124 if not packet:
125 raise socket.timeout
126 return packet
127 except socket.timeout, e:
128 if second_chance:
129 self._nb_requests_sent = self.MAX_REQUESTS
130 return self._send_request(packet, False)
131 else:
132 raise e
133
134 def ip_to_geo(self, ip):
135 ip_type = IPToGeo.IPV4
136 if ip.find('.') >= 0:
137 splitted_ip = [int(a) for a in ip.split('.')]
138 if len(splitted_ip) != 4:
139 raise Exception('Bad IP %s' % (ip))
140 elif ip.find(':') >= 0:
141 splitted_ip = [int(a, 16) for a in self._extend_ipv6(ip).split(':')]
142 if len(splitted_ip) != 16:
143 raise Exception('Bad IP %s' % (ip))
144 ip_type = IPToGeo.IPV6
145 else:
146 raise Exception('Bad IP %s' % (ip))
147
148 packet = self._create_request(splitted_ip, ip_type)
149 packet = self._send_request(packet)
150 (ip, country_code) = self._check_request(packet)
151 if country_code:
152 # convert to string
153 country_code = '%c%c' % (country_code[0], country_code[1])
154 return (ip, country_code)
155
156 def close(self):
157 self._socket.close()

Archive Download this file

Branches

Tags