要怎麼讓 chrome extension 裡的 javascript/jquery 的 click 可以比照真人去點擊?

要怎麼去檢查某一個網頁的事件是人為去觸發,還是透過 javascript / jquery 觸發?答案是檢查 event.which, 要怎麼在 javascript 裡產生出 event.which? 光用 javascript 是無法做到,因為安全性的問題,所以被瀏覽器阻擋掉了。

解法:透過 selenium 或 nodriver 送出模擬人類的點擊。

詳細的修改方式,參考看看這一個 commit:
https://github.com/max32002/ddddext/commit/dc8d20635529c7241b9978a299c746d37783d123


Step 1: 要讓 chrome extension 是被 selenium / nodriver 等工具載入。

undetected chromedriver:

options.add_argument('--load-extension=' + extension_path)
driver = uc.Chrome(options=options)

nodriver:

conf = Config()
conf.add_extension(extension_path)
driver = await uc.start(conf)

Step 2: 產生一個 local web server 來收集 extension 發送的指令。

sample source code:

async def main_server():
    ocr = None
    try:
        ocr = ddddocr.DdddOcr(show_ad=False, beta=True)
    except Exception as exc:
        print(exc)
        pass

    app = Application([
        ("/", HomepageHandler),
        ("/sendkey", SendkeyHandler),
        ("/ocr", OcrHandler),
        ('/(.*)', StaticFileHandler, {"path": os.path.join(".", 'www/')}),
    ])
    app.ocr = ocr;
    app.listen(CONST_SERVER_PORT)
    print("server running on port:", CONST_SERVER_PORT)
    url="http://127.0.0.1:" + str(CONST_SERVER_PORT) + "/"
    await asyncio.Event().wait()


Step 3: 在 extension javascript 裡透過 background.js 送出 http request 給 local web server, 傳出的內容是 selector 字串。

webpage.js:

function get_remote_url(settings) {
    let remote_url_string = "";
    if (settings) {
        let remote_url_array = [];
        if (settings.advanced.remote_url.length > 0) {
            remote_url_array = JSON.parse('[' + settings.advanced.remote_url + ']');
        }
        if (remote_url_array.length) {
            remote_url_string = remote_url_array[0];
        }
    }
    return remote_url_string;
}

async function webdriver_sendkey(selector, answer) {
    let api_url = get_remote_url(settings);
    //console.log("api_url:" + api_url);
    if(api_url.indexOf("127.0.0.")>-1) {
        let body = {
            token: settings.token,
            command: [
            {type: 'sendkey', selector: selector, text: answer}
        ]};
        body = JSON.stringify(body);

        let bundle = {
            action: 'post',
            data: {
                'url': api_url + 'sendkey',
                'post_data': body,
            }
        };
        let bundle_string = JSON.stringify(bundle);
        //console.log(bundle);
        const return_answer = await chrome.runtime.sendMessage(bundle);
        //console.log(return_answer);
    }
}

async function webdriver_location_sendkey(selector, answer, location) {
    let api_url = get_remote_url(settings);
    //console.log("api_url:" + api_url);
    if(api_url.indexOf("127.0.0.")>-1) {
        let body = {
            token: settings.token,
            command: [
            {type: 'sendkey', selector: selector, text: answer, location: location}
        ]};
        body = JSON.stringify(body);

        let bundle = {
            action: 'post',
            data: {
                'url': api_url + 'sendkey',
                'post_data': body,
            }
        };
        let bundle_string = JSON.stringify(bundle);
        //console.log(bundle);
        const return_answer = await chrome.runtime.sendMessage(bundle);
        //console.log(return_answer);
    }
}

async function webdriver_click(selector) {
    let api_url = get_remote_url(settings);
    //console.log("api_url:" + api_url);
    if(api_url.indexOf("127.0.0.")>-1) {
        let body = {
            token: settings.token,
            command: [
            {type: 'click', selector: selector}
        ]};
        body = JSON.stringify(body);

        let bundle = {
            action: 'post',
            data: {
                'url': api_url + 'sendkey',
                'post_data': body,
            }
        };
        let bundle_string = JSON.stringify(bundle);
        //console.log(bundle);
        const return_answer = await chrome.runtime.sendMessage(bundle);
        //console.log(return_answer);
    }
}

async function webdriver_location_click(selector, location) {
    let api_url = get_remote_url(settings);
    //console.log("api_url:" + api_url);
    if(api_url.indexOf("127.0.0.")>-1) {
        let body = {
            token: settings.token,
            command: [
            {type: 'click', selector: selector, location: location}
        ]};
        body = JSON.stringify(body);

        let bundle = {
            action: 'post',
            data: {
                'url': api_url + 'sendkey',
                'post_data': body,
            }
        };
        let bundle_string = JSON.stringify(bundle);
        //console.log(bundle);
        const return_answer = await chrome.runtime.sendMessage(bundle);
        //console.log(return_answer);
    }
}

上面是副程式,主程式:

const selector="#your_button_id";
webdriver_click(selector);

說明: 原本的一行 $(“#your_button_id”).click(); 換成上面程式碼即可。

background.js:

async function ocr(data_url, image_data, tabId)
{
    fetch(data_url,{
        method: 'POST',
        headers: {
          'Content-Type': 'application/json'
        },
        body: JSON.stringify({
          image_data: image_data
        })
    })
    .then(response =>
    {
        if (response.ok)
        {
            return response.json();
        }
        else if (response.status === 404)
        {
            let result_json={"answer": "", "fail": 'error 404'};
            return Promise.reject('error 404')
        }
        else
        {
            let result_json={"answer": "", "fail": response.status};
            return Promise.reject('some other error: ' + response.status)
        }
    }
    )
    .then((data) =>
    {
        if (data)
        {
            let result_json=data;
            console.log(result_json);
            chrome.tabs.sendMessage(tabId, result_json);
        }
    }
    )
    .catch(error =>
    {
        let result_json={"answer": "", "fail": error};
    }
    );
}

async function post(data_url, post_body, tabId)
{
    fetch(data_url,{
        method: 'POST',
        headers: {
          'Content-Type': 'application/json'
        },
        body: post_body
    })
    .then(response =>
    {
        if (response.ok)
        {
            return response.json();
        }
        else if (response.status === 404)
        {
            let result_json={"answer": "", "fail": 'error 404'};
            return Promise.reject('error 404')
        }
    }
    )
    .then((data) =>
    {
        if (data)
        {
            let result_json=data;
            chrome.tabs.sendMessage(tabId, result_json);
        }
    }
    )
    .catch(error =>
    {
        console.log('error is', error)
    }
    );
}

// for avoid overheat.
chrome.storage.local.set(
{
    last_reload_timestamp: []
}
);


chrome.runtime.onMessage.addListener((request, sender, sendResponse) => {
    let request_json = request;
    let result_json={"answer": "pong from background"};

    if(request_json.action=="ocr") {
        const tabId = sender.tab.id;
        ocr(request_json.data.url, request_json.data.image_data, tabId);
    }

    if(request_json.action=="post") {
        const tabId = sender.tab.id;
        post(request_json.data.url, request_json.data.post_data, tabId);
    }

    if(request_json.action=="status") {
        result_json={"status": answer};
        const tabId = sender.tab.id;
        chrome.tabs.sendMessage(tabId, result_json);
    }

});

async function ocr(data_url, image_data, tabId)
{
    //console.log("data_url:"+data_url);
    fetch(data_url,{
        method: 'POST',
        headers: {
          'Content-Type': 'application/json'
        },
        body: JSON.stringify({
          image_data: image_data
        })
    })
    .then(response =>
    {
        if (response.ok)
        {
            return response.json();
        }
        else if (response.status === 404)
        {
            let result_json={"answer": "", "fail": 'error 404'};
            //console.log(result_json);
            //sendResponse(result_json);
            return Promise.reject('error 404')
        }
        else
        {
            let result_json={"answer": "", "fail": response.status};
            //console.log(result_json);
            //sendResponse(result_json);
            return Promise.reject('some other error: ' + response.status)
        }
    }
    )
    .then((data) =>
    {
        if (data)
        {
            let result_json=data;
            console.log(result_json);
            //sendResponse(result_json);
            chrome.tabs.sendMessage(tabId, result_json);
        }
    }
    )
    .catch(error =>
    {
        //console.log('error is', error)
        let result_json={"answer": "", "fail": error};
        //console.log(result_json);
        //sendResponse(result_json);
    }
    );
}

async function post(data_url, post_body, tabId)
{
    //console.log("data_url:"+data_url);
    fetch(data_url,{
        method: 'POST',
        headers: {
          'Content-Type': 'application/json'
        },
        body: post_body
    })
    .then(response =>
    {
        if (response.ok)
        {
            return response.json();
        }
        else if (response.status === 404)
        {
            let result_json={"answer": "", "fail": 'error 404'};
            //console.log(result_json);
            //sendResponse(result_json);
            return Promise.reject('error 404')
        }
    }
    )
    .then((data) =>
    {
        if (data)
        {
            let result_json=data;
            console.log(result_json);
            chrome.tabs.sendMessage(tabId, result_json);
        }
    }
    )
    .catch(error =>
    {
        console.log('error is', error)
    }
    );
}


Step 4: local web server 送出 click / send_key 等指令,給存取中的網頁。

selenium:

def sendkey_to_browser(driver, config_dict):
    tmp_filepath = ""
    if "token" in config_dict:
        app_root = util.get_app_root()
        tmp_file = config_dict["token"] + ".tmp"
        tmp_filepath = os.path.join(app_root, tmp_file)

    if os.path.exists(tmp_filepath):
        sendkey_to_browser_exist(driver, tmp_filepath)

def sendkey_to_browser_exist(driver, tmp_filepath):
    sendkey_dict = None
    try:
        with open(tmp_filepath) as json_data:
            sendkey_dict = json.load(json_data)
            print(sendkey_dict)
    except Exception as e:
        print("error on open file")
        print(e)
        pass

    if sendkey_dict:
        all_command_done = True
        if "command" in sendkey_dict:
            for cmd_dict in sendkey_dict["command"]:
                #print("cmd_dict", cmd_dict)
                if cmd_dict["type"] == "sendkey":
                    print("sendkey")
                    target_text = cmd_dict["text"]
                    try:
                        form_input_1 = driver.find_element(By.CSS_SELECTOR, cmd_dict["selector"])
                        inputed_value_1 = form_input_1.get_attribute('value')
                        if not inputed_value_1 == target_text:
                            form_input_1.clear()
                            form_input_1.click()
                            form_input_1.send_keys(target_text)
                    except Exception as exc:
                        all_command_done = False
                        print("error on sendkey")
                        print(exc)
                        pass
                
                if cmd_dict["type"] == "click":
                    print("click")
                    try:
                        form_input_1 = driver.find_element(By.CSS_SELECTOR, cmd_dict["selector"])
                        form_input_1.click()
                    except Exception as exc:
                        all_command_done = False
                        print("error on click")
                        print(exc)
                        pass
                time.sleep(0.05)

        # must all command success to delete tmp file.
        if all_command_done:
            try:
                os.unlink(tmp_filepath)
            except Exception as e:
                pass

nodriver:

async def sendkey_to_browser(tab, config_dict):
    tmp_filepath = ""
    if "token" in config_dict:
        app_root = util.get_app_root()
        tmp_file = config_dict["token"] + ".tmp"
        tmp_filepath = os.path.join(app_root, tmp_file)

    if os.path.exists(tmp_filepath):
        await sendkey_to_browser_exist(tab, tmp_filepath)

async def sendkey_to_browser_exist(tab, tmp_filepath):
    sendkey_dict = None
    try:
        with open(tmp_filepath) as json_data:
            sendkey_dict = json.load(json_data)
            print(sendkey_dict)
    except Exception as e:
        print("error on open file")
        print(e)
        pass

    if sendkey_dict:
        all_command_done = True
        if "command" in sendkey_dict:
            for cmd_dict in sendkey_dict["command"]:
                #print("cmd_dict", cmd_dict)
                if cmd_dict["type"] == "sendkey":
                    print("sendkey")
                    target_text = cmd_dict["text"]
                    try:
                        element = await tab.query_selector(cmd_dict["selector"])
                        if element:
                            await element.click()
                            await element.apply('function (element) {element.value = ""; } ')
                            await element.send_keys(target_text);
                        else:
                            #print("element not found:", select_query)
                            pass
                    except Exception as e:
                        all_command_done = False
                        #print("click fail for selector:", select_query)
                        print(e)
                        pass
                
                if cmd_dict["type"] == "click":
                    print("click")

                    try:
                        element = await tab.query_selector(cmd_dict["selector"])
                        if element:
                            await element.click()
                        else:
                            #print("element not found:", select_query)
                            pass
                    except Exception as e:
                        all_command_done = False
                        #print("click fail for selector:", select_query)
                        print(e)
                        pass

                time.sleep(0.05)

        # must all command success to delete tmp file.
        if all_command_done:
            try:
                os.unlink(tmp_filepath)
            except Exception as e:
                pass

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *