Skip to content

Commit

Permalink
fix: refactor how patching is done
Browse files Browse the repository at this point in the history
  • Loading branch information
Mattwmaster58 committed Nov 17, 2024
1 parent 538cce3 commit c11b224
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 117 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ async def main():

asyncio.run(main())
```
A set of
## Test results
## Todo
- make this work with playwright.launch_persistent_context

## A set of Test results

### playwright with stealth

Expand Down
257 changes: 142 additions & 115 deletions playwright_stealth/stealth.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ def enabled_scripts():
```
"""

_USER_AGENT_OVERRIDE_PIGGYBACK_KEY = "_stealth_user_agent"
_SEC_CH_UA_OVERRIDE_PIGGYBACK_KEY = "_stealth_sec_ch_ua"

def __init__(
self,
*,
Expand Down Expand Up @@ -225,6 +228,20 @@ def apply_stealth_sync(self, page_or_context: Union[sync_api.Page, sync_api.Brow
if len(self.script_payload) > 0:
page_or_context.add_init_script(self.script_payload)

def hook_playwright_context(self, ctx: Union[async_api.Playwright, sync_api.Playwright]) -> None:
"""
Given a Playwright context object, hooks all the browser type object methods that return a Browser object.
Can be used with sync and async methods contexts
"""
browser_class_name = sync_api.Browser.__name__
for browser_type in (ctx.chromium, ctx.firefox, ctx.webkit):
chromium_mode = browser_type.name == "chromium"
for name, hooked_method in inspect.getmembers(browser_type, predicate=inspect.ismethod):
# todo: ctx.browser.launch_persistent_context
if hooked_method.__annotations__.get("return") == browser_class_name:
hooked_method = self._generate_hooked_method_that_returns_browser(hooked_method, chromium_mode)
setattr(browser_type, name, hooked_method)

def _kwargs_with_patched_cli_arg(
self, method: Callable, packed_kwargs: Dict[str, Any], chromium_mode: bool
) -> Dict[str, Any]:
Expand All @@ -244,170 +261,180 @@ def _kwargs_with_patched_cli_arg(
new_kwargs["args"] = new_cli_args
return new_kwargs

def hook_playwright_context(self, ctx: Union[async_api.Playwright, sync_api.Playwright]) -> None:
"""
Given a Playwright context object, hooks all the browser type object methods that return a Browser object.
Can be used with sync and async methods contexts
"""
for browser_type in (ctx.chromium, ctx.firefox, ctx.webkit):
for name, method in inspect.getmembers(browser_type, predicate=inspect.ismethod):
if method.__annotations__.get("return") in (
"Browser",
"BrowserContext",
):
chromium_mode = browser_type.name == "chromium"
method = self._generate_hooked_method_that_returns_browser(method, chromium_mode)
setattr(browser_type, name, method)

def _generate_hooked_method_that_returns_browser(self, method: Callable, chromium_mode: bool):
async def async_hooked_method(*args, **kwargs) -> Union[async_api.Browser, async_api.BrowserContext]:
browser_or_context = await method(
async def async_hooked_method(*args, **kwargs) -> async_api.Browser:
browser = await method(
*args,
**self._kwargs_with_patched_cli_arg(method, kwargs, chromium_mode),
)
self._reassign_new_page_new_context(browser_or_context)
return browser_or_context
self._reassign_new_page_new_context(browser)
return browser

def sync_hooked_method(*args, **kwargs) -> Union[sync_api.Browser, sync_api.BrowserContext]:
browser_or_context = method(
def sync_hooked_method(*args, **kwargs) -> sync_api.Browser:
browser = method(
*args,
**self._kwargs_with_patched_cli_arg(method, kwargs, chromium_mode),
)
self._reassign_new_page_new_context(browser_or_context)
return browser_or_context
self._reassign_new_page_new_context(browser)
return browser

if inspect.iscoroutinefunction(method):
return async_hooked_method
return sync_hooked_method

def _generate_hooked_new_context(self, new_context_method: Callable) -> Callable:
def _generate_hooked_new_context(self, new_context_method: Callable, new_page_method: Callable) -> Callable:
async def hooked_new_context_async(*args, **kwargs):
context = await new_context_method(*args, **kwargs)
context.new_page = self._generate_hooked_new_page(context.new_page)
context = await new_context_method(
*args, **(await self._kwargs_new_page_context_with_patches_async(new_page_method, kwargs))
)
await self.apply_stealth_async(context)
return context

def hooked_browser_method_sync(*args, **kwargs):
context = new_context_method(*args, **kwargs)
context.new_page = self._generate_hooked_new_page(context.new_page)
context = new_context_method(
*args, **(self._kwargs_new_page_context_with_patches_sync(new_page_method, kwargs))
)
self.apply_stealth_sync(context)
return context

if inspect.iscoroutinefunction(new_context_method):
return hooked_new_context_async
return hooked_browser_method_sync

def _generate_hooked_new_page(self, new_page_method: Callable) -> Callable:
def _generate_hooked_new_page(self, new_page_method: Callable, patch_kwargs: bool) -> Callable:
"""
Returns a hooked method (async or sync) for new_page.
*args and **kwargs even though these methods may not take any number of arguments,
we want to preserve accurate stack traces when caller passes args improperly
we want to preserve accurate stack traces when caller passes args improperly.
If patch_kwargs is true, we patch kwargs the caller passes to enhance evasions. This only applies
to Browser.new_page, so we pass false when passing in a BrowserContext.new_page method
"""
USER_AGENT_OVERRIDE_PIGGYBACK_KEY = "_stealth_user_agent"
SEC_CH_UA_OVERRIDE_PIGGYBACK_KEY = "_stealth_sec_ch_ua"

browser_instance = new_page_method.__self__
is_chromium = browser_instance.browser_type.name == "chromium"
async def hooked_new_page_async(*args, **kwargs):
kwargs = (
await self._kwargs_new_page_context_with_patches_async(new_page_method, kwargs)
if patch_kwargs
else kwargs
)
page = await new_page_method(*args, **kwargs)
await self.apply_stealth_async(page)
return page

def hooked_new_page_sync(*args, **kwargs):
kwargs = (
self._kwargs_new_page_context_with_patches_sync(new_page_method, kwargs) if patch_kwargs else kwargs
)
page = new_page_method(*args, **kwargs)
self.apply_stealth_sync(page)
return page

if inspect.iscoroutinefunction(new_page_method):
return hooked_new_page_async
return hooked_new_page_sync

async def get_user_agent_and_sec_ch_ua_async(page_method: Callable) -> Tuple[str, str]:
"""
If there's no override, it's Chrome, and we haven't cached a UA value prior, we need to come up
with an accurate, non-headless UA ourselves. It's impossible to get UA without creating a temp page:
https://github.com/microsoft/playwright/issues/31743#issuecomment-2241550377
We can piggyback on the browser object to cache the UA - this way we don't
async def _kwargs_new_page_context_with_patches_async(
self, unpatched_new_page: Callable, packed_kwargs: Dict[str, Any]
) -> Dict[str, Any]:
"""
This returns kwargs with arguments added based on enabled evasions, while respecting any kwargs the caller
has passed in. If enabled and overrides aren't set for either navigator_user_agent or sec_ch_ua,
and we're dealing with a Chromium browser, we create a temporary page to get up-to-date UA information.
This function is suitable for patching kwargs for new_context and new_page, though in either case, an unpatched
new_page method must be provided.
Args:
unpatched_new_page: new_page
packed_kwargs: kwargs the caller has passed in
Returns:
patched kwargs
"""
browser_or_context = unpatched_new_page.__self__
if isinstance(browser_or_context, async_api.BrowserContext):
browser_instance = browser_or_context.browser
else:
browser_instance = browser_or_context
is_chromium = browser_instance.browser_type.name == "chromium"

Returns:
user_agent, sec_ch_ua
"""
async def get_user_agent_and_sec_ch_ua_async() -> Tuple[str, str]:
temp_page: Optional[async_api.Page]
stealth_user_agent = getattr(browser_instance, USER_AGENT_OVERRIDE_PIGGYBACK_KEY, None)
sec_ch_ua = getattr(browser_instance, SEC_CH_UA_OVERRIDE_PIGGYBACK_KEY, None)
stealth_user_agent = getattr(browser_instance, self._USER_AGENT_OVERRIDE_PIGGYBACK_KEY, None)
sec_ch_ua = getattr(browser_instance, self._SEC_CH_UA_OVERRIDE_PIGGYBACK_KEY, None)
if stealth_user_agent is None or sec_ch_ua is None:
temp_page = await page_method()
temp_page = await unpatched_new_page()
stealth_user_agent = (await temp_page.evaluate("navigator.userAgent")).replace(
"HeadlessChrome", "Chrome"
)
await temp_page.close(reason="playwright_stealth internal temp utility page")
sec_ch_ua = self._get_greased_chrome_sec_ua_ch(stealth_user_agent)
setattr(browser_instance, SEC_CH_UA_OVERRIDE_PIGGYBACK_KEY, sec_ch_ua)
setattr(browser_instance, USER_AGENT_OVERRIDE_PIGGYBACK_KEY, stealth_user_agent)
setattr(browser_instance, self._SEC_CH_UA_OVERRIDE_PIGGYBACK_KEY, sec_ch_ua)
setattr(browser_instance, self._USER_AGENT_OVERRIDE_PIGGYBACK_KEY, stealth_user_agent)
return stealth_user_agent, sec_ch_ua

def get_user_agent_and_sec_ch_ua_sync(page_method: Callable) -> Tuple[str, str]:
new_kwargs = deepcopy(packed_kwargs)
if self.navigator_user_agent and packed_kwargs.get("user_agent") is None:
resolved_user_agent_override = self.navigator_user_agent_override
if resolved_user_agent_override is None and is_chromium:
resolved_user_agent_override, _ = await get_user_agent_and_sec_ch_ua_async()
new_kwargs["user_agent"] = resolved_user_agent_override
extra_http_headers = packed_kwargs.get("extra_http_headers", {})
if self.sec_ch_ua and CaseInsensitiveDict(extra_http_headers).get("sec-ch-ua") is None:
resolved_sec_ch_ua_override = self.sec_ch_ua_override
if resolved_sec_ch_ua_override is None and is_chromium:
_, resolved_sec_ch_ua_override = await get_user_agent_and_sec_ch_ua_async()
if resolved_sec_ch_ua_override is not None:
extra_http_headers["sec-ch-ua"] = resolved_sec_ch_ua_override
new_kwargs["extra_http_headers"] = extra_http_headers

return new_kwargs

def _kwargs_new_page_context_with_patches_sync(
self, unpatched_new_page: Callable, packed_kwargs: Dict[str, Any]
) -> Dict[str, Any]:
"""see self._kwargs_new_page_context_with_patches_async for docs."""
browser_or_context = unpatched_new_page.__self__
if isinstance(browser_or_context, sync_api.BrowserContext):
browser_instance = browser_or_context.browser
else:
browser_instance = browser_or_context
is_chromium = browser_instance.browser_type.name == "chromium"

def get_user_agent_and_sec_ch_ua_sync() -> Tuple[str, str]:
temp_page: Optional[sync_api.Page]
stealth_user_agent = getattr(browser_instance, USER_AGENT_OVERRIDE_PIGGYBACK_KEY)
sec_ch_ua = getattr(browser_instance, SEC_CH_UA_OVERRIDE_PIGGYBACK_KEY)
stealth_user_agent = getattr(browser_instance, self._USER_AGENT_OVERRIDE_PIGGYBACK_KEY, None)
sec_ch_ua = getattr(browser_instance, self._SEC_CH_UA_OVERRIDE_PIGGYBACK_KEY, None)
if stealth_user_agent is None or sec_ch_ua is None:
temp_page = page_method()
temp_page = unpatched_new_page()
stealth_user_agent = temp_page.evaluate("navigator.userAgent").replace("HeadlessChrome", "Chrome")
sec_ch_ua = self._get_greased_chrome_sec_ua_ch(stealth_user_agent)
temp_page.close(reason="playwright_stealth internal temp utility page")
setattr(browser_instance, SEC_CH_UA_OVERRIDE_PIGGYBACK_KEY, sec_ch_ua)
setattr(browser_instance, USER_AGENT_OVERRIDE_PIGGYBACK_KEY, stealth_user_agent)
setattr(browser_instance, self._SEC_CH_UA_OVERRIDE_PIGGYBACK_KEY, sec_ch_ua)
setattr(browser_instance, self._USER_AGENT_OVERRIDE_PIGGYBACK_KEY, stealth_user_agent)
return stealth_user_agent, sec_ch_ua

async def hooked_browser_method_async(*args, **kwargs):
# respect any override the user passes themselves
if self.navigator_user_agent and kwargs.get("user_agent") is None:
resolved_user_agent_override = self.navigator_user_agent_override
if resolved_user_agent_override is None and is_chromium:
resolved_user_agent_override, _ = await get_user_agent_and_sec_ch_ua_async(new_page_method)
kwargs["user_agent"] = resolved_user_agent_override

extra_http_headers = kwargs.get("extra_http_headers", {})
# respect any override the user passes themselves
if self.sec_ch_ua and CaseInsensitiveDict(extra_http_headers).get("sec-ch-ua") is None:
resolved_sec_ch_ua_override = self.sec_ch_ua_override
if resolved_sec_ch_ua_override is None and is_chromium:
_, resolved_sec_ch_ua_override = await get_user_agent_and_sec_ch_ua_async(new_page_method)
if resolved_sec_ch_ua_override is not None:
# this could be tricky is a differently cased key of the same thing exists,
# but we have done a case-insensitive check above that precludes this
extra_http_headers["sec-ch-ua"] = resolved_sec_ch_ua_override
kwargs["extra_http_headers"] = extra_http_headers
page = await new_page_method(*args, **kwargs)
await self.apply_stealth_async(page)
return page

def hooked_browser_method_sync(*args, **kwargs):
if self.navigator_user_agent and kwargs.get("user_agent") is None:
resolved_user_agent_override = self.navigator_user_agent_override
if resolved_user_agent_override is None and is_chromium:
resolved_user_agent_override, _ = get_user_agent_and_sec_ch_ua_sync(new_page_method)
kwargs["user_agent"] = self.navigator_user_agent_override

extra_http_headers = kwargs.get("extra_http_headers", {})
if self.sec_ch_ua and CaseInsensitiveDict(extra_http_headers).get("sec-ch-ua") is None:
resolved_sec_ch_ua_override = self.sec_ch_ua_override
# respect any override the user has already made
if resolved_sec_ch_ua_override is None and is_chromium:
_, resolved_sec_ch_ua_override = get_user_agent_and_sec_ch_ua_sync(new_page_method)
if resolved_sec_ch_ua_override is not None:
# this could be tricky is a differently cased key of the same thing exists,
# but we have done a case-insensitive check above that precludes this
extra_http_headers["sec-ch-ua"] = resolved_sec_ch_ua_override
kwargs["extra_http_headers"] = extra_http_headers
page = new_page_method(*args, **kwargs)
self.apply_stealth_sync(page)
return page
new_kwargs = deepcopy(packed_kwargs)
if self.navigator_user_agent and packed_kwargs.get("user_agent") is None:
resolved_user_agent_override = self.navigator_user_agent_override
if resolved_user_agent_override is None and is_chromium:
resolved_user_agent_override, _ = get_user_agent_and_sec_ch_ua_sync()
new_kwargs["user_agent"] = resolved_user_agent_override
extra_http_headers = packed_kwargs.get("extra_http_headers", {})
if self.sec_ch_ua and CaseInsensitiveDict(extra_http_headers).get("sec-ch-ua") is None:
resolved_sec_ch_ua_override = self.sec_ch_ua_override
if resolved_sec_ch_ua_override is None and is_chromium:
_, resolved_sec_ch_ua_override = get_user_agent_and_sec_ch_ua_sync()
if resolved_sec_ch_ua_override is not None:
extra_http_headers["sec-ch-ua"] = resolved_sec_ch_ua_override
new_kwargs["extra_http_headers"] = extra_http_headers

if inspect.iscoroutinefunction(new_page_method):
return hooked_browser_method_async
return hooked_browser_method_sync
return new_kwargs

def _reassign_new_page_new_context(
self,
browser_or_context: Union[
async_api.BrowserContext, async_api.Browser, sync_api.BrowserContext, sync_api.Browser
],
):
if isinstance(browser_or_context, (async_api.BrowserContext, sync_api.BrowserContext)):
context: async_api.BrowserContext = browser_or_context
context.new_page = self._generate_hooked_new_page(context.new_page)
elif isinstance(browser_or_context, (async_api.Browser, sync_api.Browser)):
browser: async_api.Browser = browser_or_context
browser.new_page = self._generate_hooked_new_page(browser.new_page)
browser.new_context = self._generate_hooked_new_context(browser.new_context)
def _reassign_new_page_new_context(self, browser: Union[async_api.Browser, sync_api.Browser]) -> None:
if isinstance(browser, (async_api.Browser, sync_api.Browser)):
browser.new_context = self._generate_hooked_new_context(browser.new_context, browser.new_page)
browser.new_page = self._generate_hooked_new_page(browser.new_page, patch_kwargs=True)
else:
raise TypeError(f"unexpected type from function (bug): returned {browser_or_context}")
raise TypeError(f"unexpected type from function (bug): returned {browser}")

@staticmethod
def _get_greased_chrome_sec_ua_ch(user_agent: str):
Expand Down

0 comments on commit c11b224

Please sign in to comment.