要怎麼去檢查某一個網頁的事件是人為去觸發,還是透過 javascript / jquery 觸發?答案是檢查 event.which, 要怎麼在 javascript 裡產生出 event.which? 光用 javascript 是無法做到,因為安全性的問題,所以被瀏覽器阻擋掉了。
解法:透過 selenium 或 nodriver 送出模擬人類的點擊。
詳細的修改方式,參考看看這一個 commit:
https://github.com/max32002/ddddext/commit/dc8d20635529c7241b9978a299c746d37783d123
Step 1: 要讓 chrome extension 是被 selenium / nodriver 等工具載入。
undetected chromedriver:
options.add_argument('--load-extension=' + extension_path)
driver = uc.Chrome(options=options)
nodriver:
conf = Config()
conf.add_extension(extension_path)
driver = await uc.start(conf)
Step 2: 產生一個 local web server 來收集 extension 發送的指令。
sample source code:
async def main_server():
ocr = None
try:
ocr = ddddocr.DdddOcr(show_ad=False, beta=True)
except Exception as exc:
print(exc)
pass
app = Application([
("/", HomepageHandler),
("/sendkey", SendkeyHandler),
("/ocr", OcrHandler),
('/(.*)', StaticFileHandler, {"path": os.path.join(".", 'www/')}),
])
app.ocr = ocr;
app.listen(CONST_SERVER_PORT)
print("server running on port:", CONST_SERVER_PORT)
url="http://127.0.0.1:" + str(CONST_SERVER_PORT) + "/"
await asyncio.Event().wait()
Step 3: 在 extension javascript 裡透過 background.js 送出 http request 給 local web server, 傳出的內容是 selector 字串。
webpage.js:
function get_remote_url(settings) {
let remote_url_string = "";
if (settings) {
let remote_url_array = [];
if (settings.advanced.remote_url.length > 0) {
remote_url_array = JSON.parse('[' + settings.advanced.remote_url + ']');
}
if (remote_url_array.length) {
remote_url_string = remote_url_array[0];
}
}
return remote_url_string;
}
async function webdriver_sendkey(selector, answer) {
let api_url = get_remote_url(settings);
//console.log("api_url:" + api_url);
if(api_url.indexOf("127.0.0.")>-1) {
let body = {
token: settings.token,
command: [
{type: 'sendkey', selector: selector, text: answer}
]};
body = JSON.stringify(body);
let bundle = {
action: 'post',
data: {
'url': api_url + 'sendkey',
'post_data': body,
}
};
let bundle_string = JSON.stringify(bundle);
//console.log(bundle);
const return_answer = await chrome.runtime.sendMessage(bundle);
//console.log(return_answer);
}
}
async function webdriver_location_sendkey(selector, answer, location) {
let api_url = get_remote_url(settings);
//console.log("api_url:" + api_url);
if(api_url.indexOf("127.0.0.")>-1) {
let body = {
token: settings.token,
command: [
{type: 'sendkey', selector: selector, text: answer, location: location}
]};
body = JSON.stringify(body);
let bundle = {
action: 'post',
data: {
'url': api_url + 'sendkey',
'post_data': body,
}
};
let bundle_string = JSON.stringify(bundle);
//console.log(bundle);
const return_answer = await chrome.runtime.sendMessage(bundle);
//console.log(return_answer);
}
}
async function webdriver_click(selector) {
let api_url = get_remote_url(settings);
//console.log("api_url:" + api_url);
if(api_url.indexOf("127.0.0.")>-1) {
let body = {
token: settings.token,
command: [
{type: 'click', selector: selector}
]};
body = JSON.stringify(body);
let bundle = {
action: 'post',
data: {
'url': api_url + 'sendkey',
'post_data': body,
}
};
let bundle_string = JSON.stringify(bundle);
//console.log(bundle);
const return_answer = await chrome.runtime.sendMessage(bundle);
//console.log(return_answer);
}
}
async function webdriver_location_click(selector, location) {
let api_url = get_remote_url(settings);
//console.log("api_url:" + api_url);
if(api_url.indexOf("127.0.0.")>-1) {
let body = {
token: settings.token,
command: [
{type: 'click', selector: selector, location: location}
]};
body = JSON.stringify(body);
let bundle = {
action: 'post',
data: {
'url': api_url + 'sendkey',
'post_data': body,
}
};
let bundle_string = JSON.stringify(bundle);
//console.log(bundle);
const return_answer = await chrome.runtime.sendMessage(bundle);
//console.log(return_answer);
}
}
上面是副程式,主程式:
const selector="#your_button_id";
webdriver_click(selector);
說明: 原本的一行 $(“#your_button_id”).click(); 換成上面程式碼即可。
background.js:
async function ocr(data_url, image_data, tabId)
{
fetch(data_url,{
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
image_data: image_data
})
})
.then(response =>
{
if (response.ok)
{
return response.json();
}
else if (response.status === 404)
{
let result_json={"answer": "", "fail": 'error 404'};
return Promise.reject('error 404')
}
else
{
let result_json={"answer": "", "fail": response.status};
return Promise.reject('some other error: ' + response.status)
}
}
)
.then((data) =>
{
if (data)
{
let result_json=data;
console.log(result_json);
chrome.tabs.sendMessage(tabId, result_json);
}
}
)
.catch(error =>
{
let result_json={"answer": "", "fail": error};
}
);
}
async function post(data_url, post_body, tabId)
{
fetch(data_url,{
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: post_body
})
.then(response =>
{
if (response.ok)
{
return response.json();
}
else if (response.status === 404)
{
let result_json={"answer": "", "fail": 'error 404'};
return Promise.reject('error 404')
}
}
)
.then((data) =>
{
if (data)
{
let result_json=data;
chrome.tabs.sendMessage(tabId, result_json);
}
}
)
.catch(error =>
{
console.log('error is', error)
}
);
}
// for avoid overheat.
chrome.storage.local.set(
{
last_reload_timestamp: []
}
);
chrome.runtime.onMessage.addListener((request, sender, sendResponse) => {
let request_json = request;
let result_json={"answer": "pong from background"};
if(request_json.action=="ocr") {
const tabId = sender.tab.id;
ocr(request_json.data.url, request_json.data.image_data, tabId);
}
if(request_json.action=="post") {
const tabId = sender.tab.id;
post(request_json.data.url, request_json.data.post_data, tabId);
}
if(request_json.action=="status") {
result_json={"status": answer};
const tabId = sender.tab.id;
chrome.tabs.sendMessage(tabId, result_json);
}
});
async function ocr(data_url, image_data, tabId)
{
//console.log("data_url:"+data_url);
fetch(data_url,{
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
image_data: image_data
})
})
.then(response =>
{
if (response.ok)
{
return response.json();
}
else if (response.status === 404)
{
let result_json={"answer": "", "fail": 'error 404'};
//console.log(result_json);
//sendResponse(result_json);
return Promise.reject('error 404')
}
else
{
let result_json={"answer": "", "fail": response.status};
//console.log(result_json);
//sendResponse(result_json);
return Promise.reject('some other error: ' + response.status)
}
}
)
.then((data) =>
{
if (data)
{
let result_json=data;
console.log(result_json);
//sendResponse(result_json);
chrome.tabs.sendMessage(tabId, result_json);
}
}
)
.catch(error =>
{
//console.log('error is', error)
let result_json={"answer": "", "fail": error};
//console.log(result_json);
//sendResponse(result_json);
}
);
}
async function post(data_url, post_body, tabId)
{
//console.log("data_url:"+data_url);
fetch(data_url,{
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: post_body
})
.then(response =>
{
if (response.ok)
{
return response.json();
}
else if (response.status === 404)
{
let result_json={"answer": "", "fail": 'error 404'};
//console.log(result_json);
//sendResponse(result_json);
return Promise.reject('error 404')
}
}
)
.then((data) =>
{
if (data)
{
let result_json=data;
console.log(result_json);
chrome.tabs.sendMessage(tabId, result_json);
}
}
)
.catch(error =>
{
console.log('error is', error)
}
);
}
Step 4: local web server 送出 click / send_key 等指令,給存取中的網頁。
selenium:
def sendkey_to_browser(driver, config_dict):
tmp_filepath = ""
if "token" in config_dict:
app_root = util.get_app_root()
tmp_file = config_dict["token"] + ".tmp"
tmp_filepath = os.path.join(app_root, tmp_file)
if os.path.exists(tmp_filepath):
sendkey_to_browser_exist(driver, tmp_filepath)
def sendkey_to_browser_exist(driver, tmp_filepath):
sendkey_dict = None
try:
with open(tmp_filepath) as json_data:
sendkey_dict = json.load(json_data)
print(sendkey_dict)
except Exception as e:
print("error on open file")
print(e)
pass
if sendkey_dict:
all_command_done = True
if "command" in sendkey_dict:
for cmd_dict in sendkey_dict["command"]:
#print("cmd_dict", cmd_dict)
if cmd_dict["type"] == "sendkey":
print("sendkey")
target_text = cmd_dict["text"]
try:
form_input_1 = driver.find_element(By.CSS_SELECTOR, cmd_dict["selector"])
inputed_value_1 = form_input_1.get_attribute('value')
if not inputed_value_1 == target_text:
form_input_1.clear()
form_input_1.click()
form_input_1.send_keys(target_text)
except Exception as exc:
all_command_done = False
print("error on sendkey")
print(exc)
pass
if cmd_dict["type"] == "click":
print("click")
try:
form_input_1 = driver.find_element(By.CSS_SELECTOR, cmd_dict["selector"])
form_input_1.click()
except Exception as exc:
all_command_done = False
print("error on click")
print(exc)
pass
time.sleep(0.05)
# must all command success to delete tmp file.
if all_command_done:
try:
os.unlink(tmp_filepath)
except Exception as e:
pass
nodriver:
async def sendkey_to_browser(tab, config_dict):
tmp_filepath = ""
if "token" in config_dict:
app_root = util.get_app_root()
tmp_file = config_dict["token"] + ".tmp"
tmp_filepath = os.path.join(app_root, tmp_file)
if os.path.exists(tmp_filepath):
await sendkey_to_browser_exist(tab, tmp_filepath)
async def sendkey_to_browser_exist(tab, tmp_filepath):
sendkey_dict = None
try:
with open(tmp_filepath) as json_data:
sendkey_dict = json.load(json_data)
print(sendkey_dict)
except Exception as e:
print("error on open file")
print(e)
pass
if sendkey_dict:
all_command_done = True
if "command" in sendkey_dict:
for cmd_dict in sendkey_dict["command"]:
#print("cmd_dict", cmd_dict)
if cmd_dict["type"] == "sendkey":
print("sendkey")
target_text = cmd_dict["text"]
try:
element = await tab.query_selector(cmd_dict["selector"])
if element:
await element.click()
await element.apply('function (element) {element.value = ""; } ')
await element.send_keys(target_text);
else:
#print("element not found:", select_query)
pass
except Exception as e:
all_command_done = False
#print("click fail for selector:", select_query)
print(e)
pass
if cmd_dict["type"] == "click":
print("click")
try:
element = await tab.query_selector(cmd_dict["selector"])
if element:
await element.click()
else:
#print("element not found:", select_query)
pass
except Exception as e:
all_command_done = False
#print("click fail for selector:", select_query)
print(e)
pass
time.sleep(0.05)
# must all command success to delete tmp file.
if all_command_done:
try:
os.unlink(tmp_filepath)
except Exception as e:
pass