!/usr/bin/python3
-- coding=utf-8 --
@Author : lhys
@FileName: proxy_tool.py
import requests
import threading
timeout = 300
lock = threading.Lock()
申请头用本人的
headers = {
'': ''
}
class MyProxy:
def __init__(self, proxy_api='', proxy_server='', max_use=5000, try_count=5): if not (proxy_api or proxy_server): raise TypeError('Proxy_api and proxy_server cannot be empty at the same time.') self.proxies = None if not proxy_server else { 'http': proxy_server, 'https': proxy_server } # 代理API self.proxy_api = proxy_api # 代理 IP 最大应用次数 self.max_use = max_use # 测试代理 IP 次数,超过次数即认为代理 IP 不可用 self.try_count = try_count # 是否爬虫申请出错,如果出错,间接更换 IP self.flag = 0 # 代理 IP 残余生存工夫 self.proxy_ttl = 0 # 各种锁 self.lock = threading.Lock() self.ttl_lock = threading.Lock() self.flag_lock = threading.Lock()def set_flag(self): self.flag_lock.acquire() self.flag = 1 self.flag_lock.release()def get_flag(self): self.flag_lock.acquire() flag = self.flag self.flag_lock.release() return flagdef decrease_ttl(self): self.ttl_lock.acquire() self.proxy_ttl -= 1 self.ttl_lock.release()def get_ttl(self): self.ttl_lock.acquire() ttl = self.proxy_ttl self.ttl_lock.release() return ttldef set_ttl(self): self.ttl_lock.acquire() self.proxy_ttl = self.max_use self.ttl_lock.release()def get_proxy(self): self.lock.acquire() proxy = self.proxies self.lock.release() return proxydef set_proxy(self): if self.proxy_ttl > 0 and self.flag == 0: return old = self.proxies if self.flag == 1: for try_count in range(self.try_count): try: requests.get('https://www.baidu.com', headers=headers, proxies=old, timeout=timeout) print(f'Test proxy {old} successfully.') return except requests.exceptions.ProxyError or requests.exceptions.ConnectionError or requests.exceptions.ConnectTimeout: print(f'Test proxy {old} failed.') break except Exception as e: print(e) if not self.proxy_api: raise ValueError('代理 IP 不可用,且代理 IP API未设置。') while True: res = requests.get(self.proxy_api) # [银行国内代码](https://www.gendan5.com/swiftcode.html)这一部分依照本人的代理 IP 文档来,仅供参考 try: if res.json()["ERRORCODE"] == "0": ip, port = res.json()["RESULT"][0]['ip'], res.json()["RESULT"][0]['port'] self.lock.acquire() self.proxies = { 'http': 'http://%s:%s' % (ip, port), 'https': 'http://%s:%s' % (ip, port) } print(f'Set proxy: {ip}:{port}.') self.flag = 0 self.lock.release() self.set_ttl() return else: print(f'Set proxy failed.') except Exception as e: print(e)
Proxy = MyProxy()
def request_by_proxy(url, use_proxy=True):
while True: try: # 应用代理 if use_proxy: proxy_ttl = Proxy.get_ttl() print(proxy_ttl) # 如果 超过最大应用次数 或者 申请呈现谬误,从新设置 IP if proxy_ttl <= 0 or Proxy.get_flag(): Proxy.set_proxy() print(Proxy.get_ttl()) proxy = Proxy.get_proxy() lock.acquire() res = requests.get(url, headers=headers, proxies=proxy, timeout=timeout) lock.release() Proxy.decrease_ttl() return res else: res = requests.get(url, headers=headers, timeout=timeout) return res except requests.exceptions.ProxyError as pe: if use_proxy: lock.release() print(f'Proxy {Proxy.proxies} is not available, reason: {pe}.') Proxy.set_flag() except requests.exceptions.Timeout as t: if use_proxy: lock.release() print(f'Time out, reason: {t}.') Proxy.set_flag() except Exception as e: if use_proxy: lock.release() print(e)
!/usr/bin/python3
-- coding=utf-8 --
@Author : lhys
@FileName: spider.py
import time
import threading
from multiprocessing import Queue
from proxy_tool import request_by_proxy
threshold = 30
queue = Queue()
class Spider(threading.Thread):
def __init__(self, use_proxy=True): super(Spider, self).__init__() self.use_proxy = use_proxydef get_data(self, url): try: res = request_by_proxy(url, self.use_proxy) # 响应解决 pass except Exception as e: print(e) returndef run(self): while True: # 如果队列空了,期待一会儿。 # 过了指定的工夫后,如果队列呈现数据,就持续爬 # 如果队列还是空的,进行线程 if queue.empty(): time.sleep(threshold) if not queue.empty(): url = queue.get() self.get_data(url) time.sleep(threshold) else: print('Queue is empty.') return