cdxy.me
Cyber Security / Data Science / Trading

代理池

代理池是爬虫、采集、爆破、刷单等必不可少的配备。读了一个github的py代理池的源码,简单易用免维护,也无需过多配置,共享一下。

结构

该程序从网站爬取代理列表,存入SQLite数据库。定时执行爬取->存入->检查->爬取的循环以保证采集到代理IP的可用性。同时本地监听HTTP请求,通过提交的GET参数筛选代理,并以json格式返回给应用程序。途中灰色框线中即代理池程序需要完成的部分。箭头方向表示代理IP流动的过程。 原理图

代码

proxypool.py line 75 入口函数ProxyPool().run()

def run(self):
        t1 = threading.Thread(target=self._api)
        t2 = threading.Thread(target=self._monitor)
        t1.start()
        t2.start()

开两个线程,一个用做服务器对外提供代理IP,另一个用于维护代理池里IP的可用性。

线程1

def _api(self):
        ProxyServer(API_CONFIG['PORT'])
class ProxyServer:
    def __init__(self, port):
        self.port = int(port)
        self.run()
    ....
    def run(self):
        http_server = HTTPServer(('localhost', self.port), self.ProxyPoolHandler)
        logger.info('listened on localhost:%s' % API_CONFIG['PORT'])
        http_server.serve_forever()

本地搭建HTTP服务,使用URL参数作为筛选条件从数据库中筛出符合条件的代理并以json格式返回。

def get_proxy(self, params):
            where_dict = {'port': 'port', 'type': 'type', 'protocol': 'protocol', 'area': 'area'}
            conds = {
                'field': ['ip', 'port'],
                'order': ['updatetime desc', 'lastusedtime', 'score desc', 'speed'],
                'limit': 1,
                'where': [],
            }
            if params:
                for (k, v) in params.items():
                    try:
                        if k == 'num':
                            conds['limit'] = v[0]
                        elif k == 'area':
                            conds['where'].append((where_dict[k], 'like', '%%%s%%' % v[0]))
                        else:
                            conds['where'].append((where_dict[k], '=', v[0]))
                    except:
                        continue
            data = self.sqlite.select(self.table_name, conds)
            tmp = [{'ip': n[0], 'port': n[1], 'lastusedtime': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} for
                   n in data]
            self.sqlite.update(self.table_name, tmp)
            data = ['%s:%s' % n for n in data]
            return json.dumps(data)

线程2

def _monitor(self):
      while True:
            self._update(PROXYPOOL_CONFIG['UPDATE_TIME'])
            self._delete(PROXYPOOL_CONFIG['DELETE_TIME'])
            self._crawl(PROXYPOOL_CONFIG['CRAWL_TIME'])
            time.sleep(1800)

每半小时执行一次爬取->存入->验证的工作 程序使用IP存入数据库的时间来判断IP的批次。 验证现有IP,将可用的入库。

def _update(self, minutes):
        query = 'SELECT ip,port FROM proxy WHERE updatetime<\'%s\'' % (
            (datetime.datetime.now() - datetime.timedelta(minutes=minutes)).strftime('%Y-%m-%d %H:%M:%S'))
        proxies = ['%s:%s' % n for n in self.sqlite.executesql(query)]
        if proxies:
            avaliable_proxies = self.Validator.run(proxies)
            self.save2sqlite(avaliable_proxies)

删除上一批次IP

def _delete(self, minutes):
        query = 'DELETE FROM proxy WHERE updatetime<\'%s\'' % (
            (datetime.datetime.now() - datetime.timedelta(minutes=minutes)).strftime('%Y-%m-%d %H:%M:%S'))
        self.sqlite.executesql(query)

爬取新的IP

def _crawl(self, minutes):
        query = 'SELECT COUNT(*) FROM proxy WHERE updatetime>\'%s\'' % (
            (datetime.datetime.now() - datetime.timedelta(minutes=minutes)).strftime('%Y-%m-%d %H:%M:%S'))
        count = self.sqlite.executesql(query)[0]
        if int(count[0]) < PROXYPOOL_CONFIG['MIN_IP_NUM']:
            logger.info('Crawl proxy begin')
            proxies = self.Crawler.run()
            logger.info('Crawl proxy end')
            logger.info('Validate proxy begin')
            avaliable_proxies = self.Validator.run(proxies)
            logger.info('Validate proxy end')
            if DB_CONFIG['SQLITE']:
                self.save2sqlite(avaliable_proxies)

效果

服务器 server_pic 客户端 client_pic client_pic2 程序调用代码

proxies = json.loads(requests.get('http://localhost:8000/').content)
proxy = proxies[0]
requests.get(target_url,proxies={'http':'http://%s' % proxy})