2065 字
10 分钟
爬虫逆向(爬虫逆向进阶实战总结)
爬虫逆向(爬虫逆向进阶实战总结)
一、常见的压缩与混淆
LSB隐写
LSB(Least Significant Bit)隐写是一种将信息嵌入到图像的最低有效位的技术。以下是一个Python实现:
from PIL import Image
def plus(str): return str.zfill(8)
def get_key(strr): str_ = "" for i in range(len(strr)): str_ = str_ + plus(bin(ord(strr[i])).replace('0b', '')) return str_
def mod(x, y): return x % y
def func(old_img, str2, new_img): im = Image.open(old_img) width = im.size[0] height = im.size[1] count = 0 key = get_key(str2) keylen = len(key) for h in range(0, height): for w in range(0, width): pixel = im.getpixel((w, h)) a = pixel[0] b = pixel[1] c = pixel[2] if count == keylen: break a = a - mod(a, 2) + int(key[count]) count += 1 if count == keylen: im.putpixel((w, h), (a, b, c)) break b = b - mod(b, 2) + int(key[count]) count += 1 if count == keylen: im.putpixel((w, h), (a, b, c)) break c = c - mod(c, 2) + int(key[count]) count += 1 if count == keylen: im.putpixel((w, h), (a, b, c)) break if count % 3 == 0: im.putpixel((w, h), (a, b, c)) im.save(new_img)
# 使用示例old_img = r"timg.jpg"new_img = r"timg2.jpg"func(old_img, "Lx Is Good Man", new_img)二、常见的编码与加密
Base64
import base64print(base64.b64encode('lx'.encode())) # 编码print(base64.b64decode('bHg='.encode())) # 解码MD5
import hashlibm = hashlib.md5()m.update(b'12')print(m.hexdigest()) # 输出 202cb962ac59075b964b07152d234b70DES
import binasciifrom pyDes import des, CBC, PAD_PKCS5
def des_encrypt(secret_key, s): iv = secret_key k = des(secret_key, CBC, iv, pad=None, padmode=PAD_PKCS5) en = k.encrypt(s, padmode=PAD_PKCS5) return binascii.b2a_hex(en)
def des_decrypt(secret_key, s): iv = secret_key k = des(secret_key, CBC, iv, pad=None, padmode=PAD_PKCS5) de = k.decrypt(binascii.a2b_hex(s), padmode=PAD_PKCS5) return de
secret_str = des_encrypt('999', 'lx-message')clear_str = des_decrypt('999', secret_str)AES
import base64from Crypto.Cipher import AES
def add_to_16(value): while len(value) % 16 != 0: value += '\0' return str.encode(value)
def encrypt(key, text): aes = AES.new(add_to_16(key), AES.MODE_ECB) encrypt_aes = aes.encrypt(add_to_16(text)) encrypted_text = str(base64.encodebytes(encrypt_aes), encoding='utf-8') return encrypted_text
def decrypt(key, text): aes = AES.new(add_to_16(key), AES.MODE_ECB) base64_decrypted = base64.decodebytes(text.encode(encoding='utf-8')) decrypted_text = str(aes.decrypt(base64_decrypted), encoding='utf-8').replace('\0', '') return decrypted_textJavaScript实现:
let password = "lx123";let key = "1234567890abcdef";cfg = { mode: CryptoJs.mode.ECB, padding: CryptoJs.pad.Pkcs7}let encPwd = CryptoJs.AES.encrypt(password, key, cfg).toString();let decPwd = CryptoJs.AES.decrypt(encPwd, key, cfg).toString(CryptoJs.enc.Utf8);RSA
import base64import rsafrom rsa import common
class RsaUtil(object): PUBLIC_KEY_PATH = 'public_key.pem' PRIVATE_KEY_PATH = 'private_key.pem'
def __init__(self, company_pub_file=PUBLIC_KEY_PATH, company_pri_file=PRIVATE_KEY_PATH): if company_pub_file: self.company_public_key = rsa.PublicKey.load_pkcs1_openssl_pem(open(company_pub_file).read()) if company_pri_file: self.company_private_key = rsa.PrivateKey.load_pkcs1(open(company_pri_file).read())
def get_max_length(self, rsa_key, encrypt=True): blocksize = common.byte_size(rsa_key.n) reserve_size = 11 if not encrypt: reserve_size = 0 maxlength = blocksize - reserve_size return maxlength
def encrypt_by_public_key(self, message): encrypt_result = b'' max_length = self.get_max_length(self.company_public_key) while message: input = message[:max_length] message = message[max_length:] out = rsa.encrypt(input, self.company_public_key) encrypt_result += out encrypt_result = base64.b64encode(encrypt_result) return encrypt_result
def decrypt_by_private_key(self, message): decrypt_result = b"" max_length = self.get_max_length(self.company_private_key, False) decrypt_message = base64.b64decode(message) while decrypt_message: input = decrypt_message[:max_length] decrypt_message = decrypt_message[max_length:] out = rsa.decrypt(input, self.company_private_key) decrypt_result += out return decrypt_resultJavaScript实现:
const JSEncrypt = require('jsencrypt');publickey = '公钥';
let jse = new JSEncrypt();jse.setPublicKey(publickey);var encStr = jse.encrypt('username');
privatekey = '私钥';jse.setPrivateKey(privatekey);var Str = jse.decrypt(encStr);三、加密参数还原与模拟(实战)
1. Python模拟JS
import execjsjs = 'function test() { return "hello"; }'xvt_anti = execjs.compile(js).call('test')2. MD5加密逆向
import hashlibm = hashlib.md5()m.update(b'12')print(m.hexdigest())3. RSA加密逆向案例
from Cryptodome.PublicKey import RSAfrom Cryptodome.Cipher import PKCS1_v1_5import base64
def encrypt_str(data): key = "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAsgDq4OqxuEisnk2F0EJFmw4xKa5IrcqEYHvqxPs2CHEg2kolhfWA2SjNuGAHxyDDE5MLtOvzuXjBx/5YJtc9zj2xR/0moesS+Vi/xtG1tkVaTCba+TV+Y5C61iyr3FGqr+KOD4/XECu0Xky1W9ZmmaFADmZi7+6gO9wjgVpU9aLcBcw/loHOeJrCqjp7pA98hRJRY+MML8MK15mnC4ebooOva+mJlstW6t/1lghR8WNV8cocxgcHHuXBxgns2MlACQbSdJ8c6Z3RQeRZBzyjfey6JCCfbEKouVrWIUuPphBL3OANfgp0B+QG31bapvePTfXU48TYK0M5kE+8LgbbWQIDAQAB" rsakey = RSA.import_key(base64.b64decode(key)) cipher = PKCS1_v1_5.new(rsakey) cipher_text = base64.b64encode(cipher.encrypt(data.encode(encoding="utf-8"))) return cipher_text
password = encrypt_str("11")print(password)4. AES逆向
数据加密逆向
import requestsimport base64, json, refrom Crypto.Cipher import AES
def decrypt(info: str) -> list: key = '3sd&d24h@$udD2s*'.encode(encoding='utf-8') cipher = AES.new(key, mode=AES.MODE_ECB) json_str = str(cipher.decrypt(base64.b64decode(info)), encoding='utf-8') data = re.sub('[\x00-\x09|\x0b-\x0c|\x0e-\x1f]', '', json_str) return json.loads(data)
headers = {} # 请填写实际请求头url = "https://api.hanghangcha.com/hhc/tag"res = requests.get(url, headers=headers)payload = json.loads(res.content)['data']data = decrypt(payload)print(data)数据链接逆向
import execjs
js = '''var CryptoJS = require("crypto-js");
function lx(hh) { var aa = hh.split("/"); var aaa = aa.length; var bbb = aa[aaa - 1].split('.'); var ccc = bbb[0]; var cccc = bbb[1]; var r = /^\+?[1-9][0-9]*$/;
var srcs = CryptoJS.enc.Utf8.parse(ccc); var s = "qnbyzzwmdgghmcnm"; var k = CryptoJS.enc.Utf8.parse(s); var en = CryptoJS.AES.encrypt(srcs, k, { mode: CryptoJS.mode.ECB, padding: CryptoJS.pad.Pkcs7 }); var ddd = en.toString(); ddd = ddd.replace(/\//g, "^"); ddd = ddd.substring(0, ddd.length - 2); var bbbb = ddd + '.' + bbb[1]; aa[aaa - 1] = bbbb; var uuu = ''; for (i = 0; i < aaa; i++) { uuu += aa[i] + '/' } uuu = uuu.substring(0, uuu.length - 1); return uuu;}'''
hh = "http://ggzy.zwfwb.tj.gov.cn:80/jyxxcgjg/970369.jhtml"url = execjs.compile(js).call('lx', hh)print(url)四、环境补充
1. 手动补充环境
基础环境
document = { createElement: function(x) { return {} }};
var _getXxx = { toString: function() { return "" }};
_getXxx.__proto__.getA = function getA(x) { return {}};
window = global;document = {referrer: ""};location = { hash: "", host: "", hostname: "", href: "", origin: "", pathname: "/", port: "", protocol: "https:", search: ""};navigator = { appCodeName: "Mozilla", appName: "Netscape", appVersion: "5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.190 Safari/537.36", cookieEnabled: true, deviceMemory: 8, doNotTrack: null, hardwareConcurrency: 4, language: "zh-CN", languages: ["zh-CN", "zh"], maxTouchPoints: 0, onLine: true, platform: "Win32", product: "Gecko", productSub: "20030107", userAgent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.190 Safari/537.36", vendor: "Google Inc.", vendorSub: "",};补充document方法
var document = { createEvent: function createEvent() {}, addEventListener: function addEventListener(x) {}, createElement: function createElement(x) { if (x === "") { return {} } return {} }};补充canvas
document = { createElement: function createElement(x) { return canvas }};
canvas = { toDataURL: function toDataURL() { return "data:image/png;base64,i.....ggg==" }, getContext: function getContext(x) { if (x === "2d") { return CanvasContext } else { return {} } }};
CanvasContext = { arc: function arc() {}, stroke: function stroke() {}, fillText: function fillText() {}, toString: function() { return "[object]" }};
canvas[Symbol.toStringTag] = "HTMLCanvasElement";重写String的indexOf方法
var _indexOf = String.prototype.indexOf;String.prototype.indexOf = function (searchValue, fromIndex) { if (searchValue === 'lx') { return -1; } return _indexOf.apply(this, [searchValue, fromIndex]);}重写toString方法
var newString = Function.prototype.toString;Function.prototype.toString = function () { if (this === Window || this === Location || this === Function.prototype.toString) { return "function Window() { [native code] }"; } return newString.apply(this);};2. JSDOM环境补充
const { JSDOM } = require('jsdom');
const dom = new JSDOM(` <!DOCTYPE html> <html> <body> <script> // 模拟环境 window = { document: { location: { href: 'https://example.com' } } }; // 你的JS代码 function get_signature(url) { return "signature"; } </script> </body> </html>`);
const window = dom.window;const document = window.document;
// 执行JS代码const signature = window.get_signature('https://example.com');console.log(signature);3. Selenium环境模拟
# -*- coding: utf-8 -*-import osfrom selenium import webdriver
ua = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.190 Safari/537.36'PRO_DIR = os.path.dirname(os.path.abspath(__file__))
s1 = """ <!DOCTYPE html> <html style="font-size: 50px;"> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <title>signature-hook</title> </head> <body></body>
<script type="text/javascript"> """s2 = """ </script> </html> """
def driver_sig(html_file): option = webdriver.ChromeOptions() option.add_argument('headless') option.add_argument('--no-sandbox') option.add_argument('--user-agent={}'.format(ua)) driver = webdriver.Chrome(chrome_options=option) driver.get('file:///' + PRO_DIR + html_file) sig = driver.title driver.quit() return sig
sign_js = '''window.navigator = { userAgent: '{}'};function get_sign() { // 模拟签名逻辑 return "signature";}var signature = get_sign();document.clear();document.write(signature);'''.format(ua)
if __name__ == '__main__': html_file = 'get_sign.html' with open(html_file, 'w', encoding='utf-8') as fw: fw.write(s1 + sign_js + s2) sig = driver_sig(html_file) print(sig)五、浏览器环境监测
1. Proxy-intercept
window = new Proxy(global, { get: function(target, key, receiver) { console.log("window.get", key, target[key]); if (key === "location") { location = new Proxy(target[key], { get: function(_target, key, _receiver) { console.log("window.get", key, _target[key]); if (key === "port") { console.log("公众号【Pythonxy】"); } return _target[key]; } }); } return target[key]; }, set: function(target, key, value, receiver) { console.log("window.set", key, value); target[key] = value; }});六、自动化工具
1. Selenium
常用操作
| 操作 | 代码 | 说明 |
|---|---|---|
| 获取标题 | driver.title | 获取当前页面标题 |
| 获取URL | driver.current_url | 获取当前页面URL |
| 获取源码 | driver.page_source | 获取当前页面源码 |
| 窗口最大化 | driver.maximize_window() | 浏览器窗口最大化 |
| 设置窗口大小 | driver.set_window_size(480, 480) | 设置浏览器窗口宽高 |
| 刷新页面 | driver.refresh() | 刷新当前页面 |
| 页面后退 | driver.back() | 浏览器后退 |
| 页面前进 | driver.forward() | 浏览器前进 |
| 截图 | driver.get_screenshot_as_file('screenshot.png') | 截屏保存为文件 |
元素定位方法
| 定位方式 | 代码 | 说明 |
|---|---|---|
| 标签定位 | driver.find_element_by_tag_name('div') | 通过标签名定位 |
| 文字定位 | driver.find_element_by_link_text('百度') | 通过链接文字定位 |
| 部分文字定位 | driver.find_element_by_partial_link_text('百') | 通过部分链接文字定位 |
| XPath定位 | driver.find_element_by_xpath('//div[@id="kw"]') | 通过XPath定位 |
| Class定位 | driver.find_element_by_class_name('input') | 通过Class定位 |
| ID定位 | driver.find_element(By.ID, 'kw') | 通过ID定位 |
元素操作
| 操作 | 代码 | 说明 |
|---|---|---|
| 单击 | driver.find_element(By.ID, 'kw').click() | 点击元素 |
| 输入 | driver.find_element(By.ID, 'kw').send_keys('selenium') | 在元素中输入文本 |
| 提交 | driver.find_element(By.ID, 'su').submit() | 提交表单 |
| 清空 | driver.find_element(By.ID, 'kw').clear() | 清空输入框 |
鼠标事件
from selenium.webdriver import ActionChainsaction = ActionChains(driver)kw = driver.find_element_by_id('kw')action.context_click(kw).perform() # 右键点击键盘事件
from selenium.webdriver.common.keys import Keysdriver.find_element(By.ID, 'kw').send_keys(Keys.CONTROL, 'a') # 全选driver.find_element(By.ID, 'kw').send_keys(Keys.CONTROL, 'x') # 剪切driver.find_element(By.ID, 'kw').send_keys(Keys.CONTROL, 'v') # 粘贴窗口句柄处理
now_handle = driver.current_window_handle # 获取当前窗口句柄driver.find_element_by_link_text('登录').click() # 点击登录new_handle = driver.current_window_handle # 获取新窗口句柄driver.switch_to.window(now_handle) # 切换回原窗口driver.switch_to.window(driver.window_handles[1]) # 切换到新窗口处理iframe
frame_reference = driver.find_element_by_id('frame_id')driver.switch_to.frame(frame_reference) # 切换到iframedriver.switch_to.default_content() # 返回主文档2. Pyppeteer
Pyppeteer是Puppeteer的Python版本,用于浏览器自动化。
安装
pip install pyppeteer基本使用
import asynciofrom pyppeteer import launch
async def main(): browser = await launch(headless=False) page = await browser.newPage() await page.goto('https://example.com') await page.screenshot({'path': 'example.png'}) await browser.close()
asyncio.get_event_loop().run_until_complete(main())处理JavaScript
await page.evaluate('''() => { document.querySelector('#search').value = 'pyppeteer'; document.querySelector('#submit').click();}''')处理弹窗
async def handle_alert(page): page.on('dialog', lambda dialog: dialog.accept()) await page.goto('https://example.com')七、反爬虫策略应对
1. Selenium绕过检测
from selenium.webdriver import Chromefrom selenium.webdriver import ChromeOptions
# 79年之前option = ChromeOptions()option.add_experimental_option('excludeSwitches', ['enable-automation'])driver = Chrome(options=option)
# 79年之后driver = Chrome()driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", { "source": """Object.defineProperty(navigator, 'webdriver', {get: () => false})"""})
# 88版本之后chrome_options = ChromeOptions()chrome_options.add_argument("disable-blink-features")chrome_options.add_argument("disable-blink-features=AutomationControlled")driver = Chrome(options=chrome_options)2. 检测浏览器特征
常见的检测特征包括:
navigator.webdrivernavigator.languagesnavigator.userAgentnavigator.vendorwindow.chromewindow.navigator.pluginswindow.screen
八、RPC数据获取(示例)
1. 定位关键点
在JS中,new WebSocket()是固定语法,可用于定位关键词。
ws.onmessage = function(event) { var data = event.data;};2. RPC调用示例
import asyncioimport websockets
async def check_permit(websocket): send_text = 'lx' await websocket.send(send_text) return True
async def recv_msg(websocket): while 1: recv_text = await websocket.recv() print(recv_text)
async def main_logic(websocket, path): await check_permit(websocket) await recv_msg(websocket)
start_server = websockets.serve(main_logic, '127.0.0.1', 9999)asyncio.get_event_loop().run_until_complete(start_server)asyncio.get_event_loop().run_forever()3. 使用示例
import websocketimport json
def on_message(ws, message): print("Received: " + message)
def on_error(ws, error): print("Error: " + str(error))
def on_close(ws, close_status_code, close_msg): print("Connection closed")
def on_open(ws): ws.send(json.dumps({"type": "login", "data": {"username": "user", "password": "pass"}}))
if __name__ == "__main__": ws = websocket.WebSocketApp("wss://example.com/socket", on_open=on_open, on_message=on_message, on_error=on_error, on_close=on_close) ws.run_forever() 爬虫逆向(爬虫逆向进阶实战总结)
https://fuwari.vercel.app/posts/crawler/base/baseinfo/