2065 字
10 分钟
爬虫逆向(爬虫逆向进阶实战总结)

爬虫逆向(爬虫逆向进阶实战总结)#

一、常见的压缩与混淆#

LSB隐写#

LSB(Least Significant Bit)隐写是一种将信息嵌入到图像的最低有效位的技术。以下是一个Python实现:

from PIL import Image
def plus(str):
return str.zfill(8)
def get_key(strr):
str_ = ""
for i in range(len(strr)):
str_ = str_ + plus(bin(ord(strr[i])).replace('0b', ''))
return str_
def mod(x, y):
return x % y
def func(old_img, str2, new_img):
im = Image.open(old_img)
width = im.size[0]
height = im.size[1]
count = 0
key = get_key(str2)
keylen = len(key)
for h in range(0, height):
for w in range(0, width):
pixel = im.getpixel((w, h))
a = pixel[0]
b = pixel[1]
c = pixel[2]
if count == keylen:
break
a = a - mod(a, 2) + int(key[count])
count += 1
if count == keylen:
im.putpixel((w, h), (a, b, c))
break
b = b - mod(b, 2) + int(key[count])
count += 1
if count == keylen:
im.putpixel((w, h), (a, b, c))
break
c = c - mod(c, 2) + int(key[count])
count += 1
if count == keylen:
im.putpixel((w, h), (a, b, c))
break
if count % 3 == 0:
im.putpixel((w, h), (a, b, c))
im.save(new_img)
# 使用示例
old_img = r"timg.jpg"
new_img = r"timg2.jpg"
func(old_img, "Lx Is Good Man", new_img)

二、常见的编码与加密#

Base64#

import base64
print(base64.b64encode('lx'.encode())) # 编码
print(base64.b64decode('bHg='.encode())) # 解码

MD5#

import hashlib
m = hashlib.md5()
m.update(b'12')
print(m.hexdigest()) # 输出 202cb962ac59075b964b07152d234b70

DES#

import binascii
from pyDes import des, CBC, PAD_PKCS5
def des_encrypt(secret_key, s):
iv = secret_key
k = des(secret_key, CBC, iv, pad=None, padmode=PAD_PKCS5)
en = k.encrypt(s, padmode=PAD_PKCS5)
return binascii.b2a_hex(en)
def des_decrypt(secret_key, s):
iv = secret_key
k = des(secret_key, CBC, iv, pad=None, padmode=PAD_PKCS5)
de = k.decrypt(binascii.a2b_hex(s), padmode=PAD_PKCS5)
return de
secret_str = des_encrypt('999', 'lx-message')
clear_str = des_decrypt('999', secret_str)

AES#

import base64
from Crypto.Cipher import AES
def add_to_16(value):
while len(value) % 16 != 0:
value += '\0'
return str.encode(value)
def encrypt(key, text):
aes = AES.new(add_to_16(key), AES.MODE_ECB)
encrypt_aes = aes.encrypt(add_to_16(text))
encrypted_text = str(base64.encodebytes(encrypt_aes), encoding='utf-8')
return encrypted_text
def decrypt(key, text):
aes = AES.new(add_to_16(key), AES.MODE_ECB)
base64_decrypted = base64.decodebytes(text.encode(encoding='utf-8'))
decrypted_text = str(aes.decrypt(base64_decrypted), encoding='utf-8').replace('\0', '')
return decrypted_text

JavaScript实现:

let password = "lx123";
let key = "1234567890abcdef";
cfg = {
mode: CryptoJs.mode.ECB,
padding: CryptoJs.pad.Pkcs7
}
let encPwd = CryptoJs.AES.encrypt(password, key, cfg).toString();
let decPwd = CryptoJs.AES.decrypt(encPwd, key, cfg).toString(CryptoJs.enc.Utf8);

RSA#

import base64
import rsa
from rsa import common
class RsaUtil(object):
PUBLIC_KEY_PATH = 'public_key.pem'
PRIVATE_KEY_PATH = 'private_key.pem'
def __init__(self, company_pub_file=PUBLIC_KEY_PATH, company_pri_file=PRIVATE_KEY_PATH):
if company_pub_file:
self.company_public_key = rsa.PublicKey.load_pkcs1_openssl_pem(open(company_pub_file).read())
if company_pri_file:
self.company_private_key = rsa.PrivateKey.load_pkcs1(open(company_pri_file).read())
def get_max_length(self, rsa_key, encrypt=True):
blocksize = common.byte_size(rsa_key.n)
reserve_size = 11
if not encrypt:
reserve_size = 0
maxlength = blocksize - reserve_size
return maxlength
def encrypt_by_public_key(self, message):
encrypt_result = b''
max_length = self.get_max_length(self.company_public_key)
while message:
input = message[:max_length]
message = message[max_length:]
out = rsa.encrypt(input, self.company_public_key)
encrypt_result += out
encrypt_result = base64.b64encode(encrypt_result)
return encrypt_result
def decrypt_by_private_key(self, message):
decrypt_result = b""
max_length = self.get_max_length(self.company_private_key, False)
decrypt_message = base64.b64decode(message)
while decrypt_message:
input = decrypt_message[:max_length]
decrypt_message = decrypt_message[max_length:]
out = rsa.decrypt(input, self.company_private_key)
decrypt_result += out
return decrypt_result

JavaScript实现:

const JSEncrypt = require('jsencrypt');
publickey = '公钥';
let jse = new JSEncrypt();
jse.setPublicKey(publickey);
var encStr = jse.encrypt('username');
privatekey = '私钥';
jse.setPrivateKey(privatekey);
var Str = jse.decrypt(encStr);

三、加密参数还原与模拟(实战)#

1. Python模拟JS#

import execjs
js = 'function test() { return "hello"; }'
xvt_anti = execjs.compile(js).call('test')

2. MD5加密逆向#

import hashlib
m = hashlib.md5()
m.update(b'12')
print(m.hexdigest())

3. RSA加密逆向案例#

from Cryptodome.PublicKey import RSA
from Cryptodome.Cipher import PKCS1_v1_5
import base64
def encrypt_str(data):
key = "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAsgDq4OqxuEisnk2F0EJFmw4xKa5IrcqEYHvqxPs2CHEg2kolhfWA2SjNuGAHxyDDE5MLtOvzuXjBx/5YJtc9zj2xR/0moesS+Vi/xtG1tkVaTCba+TV+Y5C61iyr3FGqr+KOD4/XECu0Xky1W9ZmmaFADmZi7+6gO9wjgVpU9aLcBcw/loHOeJrCqjp7pA98hRJRY+MML8MK15mnC4ebooOva+mJlstW6t/1lghR8WNV8cocxgcHHuXBxgns2MlACQbSdJ8c6Z3RQeRZBzyjfey6JCCfbEKouVrWIUuPphBL3OANfgp0B+QG31bapvePTfXU48TYK0M5kE+8LgbbWQIDAQAB"
rsakey = RSA.import_key(base64.b64decode(key))
cipher = PKCS1_v1_5.new(rsakey)
cipher_text = base64.b64encode(cipher.encrypt(data.encode(encoding="utf-8")))
return cipher_text
password = encrypt_str("11")
print(password)

4. AES逆向#

数据加密逆向#

import requests
import base64, json, re
from Crypto.Cipher import AES
def decrypt(info: str) -> list:
key = '3sd&d24h@$udD2s*'.encode(encoding='utf-8')
cipher = AES.new(key, mode=AES.MODE_ECB)
json_str = str(cipher.decrypt(base64.b64decode(info)), encoding='utf-8')
data = re.sub('[\x00-\x09|\x0b-\x0c|\x0e-\x1f]', '', json_str)
return json.loads(data)
headers = {} # 请填写实际请求头
url = "https://api.hanghangcha.com/hhc/tag"
res = requests.get(url, headers=headers)
payload = json.loads(res.content)['data']
data = decrypt(payload)
print(data)

数据链接逆向#

import execjs
js = '''
var CryptoJS = require("crypto-js");
function lx(hh) {
var aa = hh.split("/");
var aaa = aa.length;
var bbb = aa[aaa - 1].split('.');
var ccc = bbb[0];
var cccc = bbb[1];
var r = /^\+?[1-9][0-9]*$/;
var srcs = CryptoJS.enc.Utf8.parse(ccc);
var s = "qnbyzzwmdgghmcnm";
var k = CryptoJS.enc.Utf8.parse(s);
var en = CryptoJS.AES.encrypt(srcs, k, {
mode: CryptoJS.mode.ECB,
padding: CryptoJS.pad.Pkcs7
});
var ddd = en.toString();
ddd = ddd.replace(/\//g, "^");
ddd = ddd.substring(0, ddd.length - 2);
var bbbb = ddd + '.' + bbb[1];
aa[aaa - 1] = bbbb;
var uuu = '';
for (i = 0; i < aaa; i++) {
uuu += aa[i] + '/'
}
uuu = uuu.substring(0, uuu.length - 1);
return uuu;
}
'''
hh = "http://ggzy.zwfwb.tj.gov.cn:80/jyxxcgjg/970369.jhtml"
url = execjs.compile(js).call('lx', hh)
print(url)

四、环境补充#

1. 手动补充环境#

基础环境#

document = {
createElement: function(x) {
return {}
}
};
var _getXxx = {
toString: function() {
return ""
}
};
_getXxx.__proto__.getA = function getA(x) {
return {}
};
window = global;
document = {referrer: ""};
location = {
hash: "",
host: "",
hostname: "",
href: "",
origin: "",
pathname: "/",
port: "",
protocol: "https:",
search: ""
};
navigator = {
appCodeName: "Mozilla",
appName: "Netscape",
appVersion: "5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.190 Safari/537.36",
cookieEnabled: true,
deviceMemory: 8,
doNotTrack: null,
hardwareConcurrency: 4,
language: "zh-CN",
languages: ["zh-CN", "zh"],
maxTouchPoints: 0,
onLine: true,
platform: "Win32",
product: "Gecko",
productSub: "20030107",
userAgent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.190 Safari/537.36",
vendor: "Google Inc.",
vendorSub: "",
};

补充document方法#

var document = {
createEvent: function createEvent() {},
addEventListener: function addEventListener(x) {},
createElement: function createElement(x) {
if (x === "") {
return {}
}
return {}
}
};

补充canvas#

document = {
createElement: function createElement(x) {
return canvas
}
};
canvas = {
toDataURL: function toDataURL() {
return ".....ggg=="
},
getContext: function getContext(x) {
if (x === "2d") {
return CanvasContext
} else {
return {}
}
}
};
CanvasContext = {
arc: function arc() {},
stroke: function stroke() {},
fillText: function fillText() {},
toString: function() {
return "[object]"
}
};
canvas[Symbol.toStringTag] = "HTMLCanvasElement";

重写String的indexOf方法#

var _indexOf = String.prototype.indexOf;
String.prototype.indexOf = function (searchValue, fromIndex) {
if (searchValue === 'lx') {
return -1;
}
return _indexOf.apply(this, [searchValue, fromIndex]);
}

重写toString方法#

var newString = Function.prototype.toString;
Function.prototype.toString = function () {
if (this === Window || this === Location || this === Function.prototype.toString) {
return "function Window() { [native code] }";
}
return newString.apply(this);
};

2. JSDOM环境补充#

const { JSDOM } = require('jsdom');
const dom = new JSDOM(`
<!DOCTYPE html>
<html>
<body>
<script>
// 模拟环境
window = {
document: {
location: {
href: 'https://example.com'
}
}
};
// 你的JS代码
function get_signature(url) {
return "signature";
}
</script>
</body>
</html>
`);
const window = dom.window;
const document = window.document;
// 执行JS代码
const signature = window.get_signature('https://example.com');
console.log(signature);

3. Selenium环境模拟#

# -*- coding: utf-8 -*-
import os
from selenium import webdriver
ua = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.190 Safari/537.36'
PRO_DIR = os.path.dirname(os.path.abspath(__file__))
s1 = """
<!DOCTYPE html>
<html style="font-size: 50px;">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>signature-hook</title>
</head>
<body></body>
<script type="text/javascript">
"""
s2 = """
</script>
</html>
"""
def driver_sig(html_file):
option = webdriver.ChromeOptions()
option.add_argument('headless')
option.add_argument('--no-sandbox')
option.add_argument('--user-agent={}'.format(ua))
driver = webdriver.Chrome(chrome_options=option)
driver.get('file:///' + PRO_DIR + html_file)
sig = driver.title
driver.quit()
return sig
sign_js = '''
window.navigator = {
userAgent: '{}'
};
function get_sign() {
// 模拟签名逻辑
return "signature";
}
var signature = get_sign();
document.clear();
document.write(signature);
'''.format(ua)
if __name__ == '__main__':
html_file = 'get_sign.html'
with open(html_file, 'w', encoding='utf-8') as fw:
fw.write(s1 + sign_js + s2)
sig = driver_sig(html_file)
print(sig)

五、浏览器环境监测#

1. Proxy-intercept#

window = new Proxy(global, {
get: function(target, key, receiver) {
console.log("window.get", key, target[key]);
if (key === "location") {
location = new Proxy(target[key], {
get: function(_target, key, _receiver) {
console.log("window.get", key, _target[key]);
if (key === "port") {
console.log("公众号【Pythonxy】");
}
return _target[key];
}
});
}
return target[key];
},
set: function(target, key, value, receiver) {
console.log("window.set", key, value);
target[key] = value;
}
});

六、自动化工具#

1. Selenium#

常用操作#

操作代码说明
获取标题driver.title获取当前页面标题
获取URLdriver.current_url获取当前页面URL
获取源码driver.page_source获取当前页面源码
窗口最大化driver.maximize_window()浏览器窗口最大化
设置窗口大小driver.set_window_size(480, 480)设置浏览器窗口宽高
刷新页面driver.refresh()刷新当前页面
页面后退driver.back()浏览器后退
页面前进driver.forward()浏览器前进
截图driver.get_screenshot_as_file('screenshot.png')截屏保存为文件

元素定位方法#

定位方式代码说明
标签定位driver.find_element_by_tag_name('div')通过标签名定位
文字定位driver.find_element_by_link_text('百度')通过链接文字定位
部分文字定位driver.find_element_by_partial_link_text('百')通过部分链接文字定位
XPath定位driver.find_element_by_xpath('//div[@id="kw"]')通过XPath定位
Class定位driver.find_element_by_class_name('input')通过Class定位
ID定位driver.find_element(By.ID, 'kw')通过ID定位

元素操作#

操作代码说明
单击driver.find_element(By.ID, 'kw').click()点击元素
输入driver.find_element(By.ID, 'kw').send_keys('selenium')在元素中输入文本
提交driver.find_element(By.ID, 'su').submit()提交表单
清空driver.find_element(By.ID, 'kw').clear()清空输入框

鼠标事件#

from selenium.webdriver import ActionChains
action = ActionChains(driver)
kw = driver.find_element_by_id('kw')
action.context_click(kw).perform() # 右键点击

键盘事件#

from selenium.webdriver.common.keys import Keys
driver.find_element(By.ID, 'kw').send_keys(Keys.CONTROL, 'a') # 全选
driver.find_element(By.ID, 'kw').send_keys(Keys.CONTROL, 'x') # 剪切
driver.find_element(By.ID, 'kw').send_keys(Keys.CONTROL, 'v') # 粘贴

窗口句柄处理#

now_handle = driver.current_window_handle # 获取当前窗口句柄
driver.find_element_by_link_text('登录').click() # 点击登录
new_handle = driver.current_window_handle # 获取新窗口句柄
driver.switch_to.window(now_handle) # 切换回原窗口
driver.switch_to.window(driver.window_handles[1]) # 切换到新窗口

处理iframe#

frame_reference = driver.find_element_by_id('frame_id')
driver.switch_to.frame(frame_reference) # 切换到iframe
driver.switch_to.default_content() # 返回主文档

2. Pyppeteer#

Pyppeteer是Puppeteer的Python版本,用于浏览器自动化。

安装#

Terminal window
pip install pyppeteer

基本使用#

import asyncio
from pyppeteer import launch
async def main():
browser = await launch(headless=False)
page = await browser.newPage()
await page.goto('https://example.com')
await page.screenshot({'path': 'example.png'})
await browser.close()
asyncio.get_event_loop().run_until_complete(main())

处理JavaScript#

await page.evaluate('''() => {
document.querySelector('#search').value = 'pyppeteer';
document.querySelector('#submit').click();
}''')

处理弹窗#

async def handle_alert(page):
page.on('dialog', lambda dialog: dialog.accept())
await page.goto('https://example.com')

七、反爬虫策略应对#

1. Selenium绕过检测#

from selenium.webdriver import Chrome
from selenium.webdriver import ChromeOptions
# 79年之前
option = ChromeOptions()
option.add_experimental_option('excludeSwitches', ['enable-automation'])
driver = Chrome(options=option)
# 79年之后
driver = Chrome()
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
"source": """Object.defineProperty(navigator, 'webdriver', {get: () => false})"""
})
# 88版本之后
chrome_options = ChromeOptions()
chrome_options.add_argument("disable-blink-features")
chrome_options.add_argument("disable-blink-features=AutomationControlled")
driver = Chrome(options=chrome_options)

2. 检测浏览器特征#

常见的检测特征包括:

  • navigator.webdriver
  • navigator.languages
  • navigator.userAgent
  • navigator.vendor
  • window.chrome
  • window.navigator.plugins
  • window.screen

八、RPC数据获取(示例)#

1. 定位关键点#

在JS中,new WebSocket()是固定语法,可用于定位关键词。

ws.onmessage = function(event) {
var data = event.data;
};

2. RPC调用示例#

import asyncio
import websockets
async def check_permit(websocket):
send_text = 'lx'
await websocket.send(send_text)
return True
async def recv_msg(websocket):
while 1:
recv_text = await websocket.recv()
print(recv_text)
async def main_logic(websocket, path):
await check_permit(websocket)
await recv_msg(websocket)
start_server = websockets.serve(main_logic, '127.0.0.1', 9999)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

3. 使用示例#

import websocket
import json
def on_message(ws, message):
print("Received: " + message)
def on_error(ws, error):
print("Error: " + str(error))
def on_close(ws, close_status_code, close_msg):
print("Connection closed")
def on_open(ws):
ws.send(json.dumps({"type": "login", "data": {"username": "user", "password": "pass"}}))
if __name__ == "__main__":
ws = websocket.WebSocketApp("wss://example.com/socket",
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close)
ws.run_forever()
爬虫逆向(爬虫逆向进阶实战总结)
https://fuwari.vercel.app/posts/crawler/base/baseinfo/
作者
江湖一条鱼
发布于
2023-08-03
许可协议
CC BY-NC-SA 4.0