IP to Geo

IP to Geo Git Source Tree

Root/tests/iptogeo.py

1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4import socket
5import struct
6
7class IPToGeoException(Exception):
8 pass
9
10class IPToGeo(object):
11
12 MAGIC = 0x179E08EF
13 VERSION = 1
14 REQ = 1
15 RESP = 0
16 IPV4 = 32
17 IPV6 = 128
18
19 IP_NOT_FOUND = 6
20
21 PACKET_SIZE = 32
22
23 ERRORS = {1 : 'Bad magic',
24 2 : 'Bad version',
25 3 : 'Bad request field' ,
26 4 : 'Bad IP version',
27 5 : 'Unsupported IP version',
28 6 : 'IP not found'}
29
30 def __init__(self, remote_addr='127.0.0.1', remote_port=53333, timeout=None):
31 self._remote_addr = remote_addr
32 self._remote_port = remote_port
33 self._timeout = timeout
34
35 self._create_socket()
36
37 def _create_socket(self):
38 self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
39 if not self._timeout is None:
40 self._socket.settimeout(self._timeout)
41 self._socket.connect((self._remote_addr, self._remote_port))
42
43 def _create_request(self, int_ip):
44 packet = ''
45 packet += struct.pack('<IBBBBI', IPToGeo.MAGIC, IPToGeo.VERSION, IPToGeo.REQ,
46 0, #err
47 IPToGeo.IPV4, # ip type
48 0) # flags
49 packet += struct.pack('<I', int_ip) # ipv4
50 packet += struct.pack('<III', 0, 0, 0) # ipv6
51 packet += struct.pack('<I', 0) # country code
52
53 return packet
54
55 def _check_request(self, packet):
56 (magic, version, req, err, ip_type, flags, ipv4, ipv6b, ipv6c, ipv6d) = struct.unpack_from('<IBBBBIIIII', packet, 0)
57
58 if magic != IPToGeo.MAGIC:
59 raise IPToGeoException('Invalid magic %08x' % (magic))
60
61 if err == IPToGeo.IP_NOT_FOUND: return (ipv4, None) # IP not found
62 if err != 0:
63 raise IPToGeoException(IPToGeo.ERRORS[err])
64
65 (cc0, cc1, cc2, cc3) = struct.unpack_from('BBBB', packet, 7*4)
66
67 return (ipv4, '%c%c%c%c' % (cc0, cc1, cc2, cc3))
68
69 def ip_to_geo(self, ip):
70 splitted_ip = [int(a) for a in ip.split('.')]
71 int_ip = splitted_ip[0] << 24
72 int_ip |= splitted_ip[1] << 16
73 int_ip |= splitted_ip[2] << 8
74 int_ip |= splitted_ip[3] << 0
75
76 packet = self._create_request(int_ip)
77 try:
78 self._socket.send(packet)
79 except IOError, e:
80 # Give another chance (we may have been disconnected due to timeout)
81 self._create_socket()
82 self._socket.send(packet)
83 packet = self._socket.recv(IPToGeo.PACKET_SIZE)
84 if not packet:
85 raise IPToGeoException('Error, empty packet')
86 (ip, country_code) = self._check_request(packet)
87 if country_code:
88 # convert to string
89 country_code = '%c%c' % (country_code[0], country_code[1])
90 return (ip, country_code)
91
92 def close(self):
93 self._socket.close()

Archive Download this file

Branches

Tags