Parallel results processing

This commit is contained in:
Soxoj
2020-12-12 18:54:34 +03:00
parent 1535e291d1
commit 552489abcb
+207 -207
View File
@@ -51,7 +51,7 @@ unsupported_characters = '#'
cookies_file = 'cookies.txt'
async def get_response(request_future, error_type, social_network, logger):
async def get_response(request_future, social_network, logger):
html_text = None
status_code = 0
@@ -97,19 +97,20 @@ async def get_response(request_future, error_type, social_network, logger):
return html_text, status_code, error_text, expection_text
async def update_site_data_from_response(site, site_data, site_info, semaphore, logger):
async def update_site_data_from_response(sitename, site_data, results_info, semaphore, logger, query_notify):
async with semaphore:
future = site_info.get('request_future')
site_obj = site_data[sitename]
future = site_obj.get('request_future')
if not future:
# ignore: search by incompatible id type
return
error_type = site_info['errorType']
site_data[site]['resp'] = await get_response(request_future=future,
error_type=error_type,
social_network=site,
response = await get_response(request_future=future,
social_network=sitename,
logger=logger)
site_data[sitename] = process_site_result(response, query_notify, logger, results_info, site_obj, sitename)
# TODO: move info separate module
def detect_error_page(html_text, status_code, fail_flags, ignore_403):
@@ -132,199 +133,20 @@ def detect_error_page(html_text, status_code, fail_flags, ignore_403):
return None, None
async def maigret(username, site_data, query_notify, logger,
proxy=None, timeout=None, recursive_search=False,
id_type='username', tags=None, debug=False, forced=False,
max_connections=100):
"""Main search func
Checks for existence of username on various social media sites.
Keyword Arguments:
username -- String indicating username that report
should be created against.
site_data -- Dictionary containing all of the site data.
query_notify -- Object with base type of QueryNotify().
This will be used to notify the caller about
query results.
proxy -- String indicating the proxy URL
timeout -- Time in seconds to wait before timing out request.
Default is no timeout.
recursive_search -- Search for other usernames in website pages & recursive search by them.
Return Value:
Dictionary containing results from report. Key of dictionary is the name
of the social network site, and the value is another dictionary with
the following keys:
url_main: URL of main site.
url_user: URL of user on site (if account exists).
status: QueryResult() object indicating results of test for
account existence.
http_status: HTTP status code of query which checked for existence on
site.
response_text: Text that came back from request. May be None if
there was an HTTP error when checking for existence.
"""
# Notify caller that we are starting the query.
if tags is None:
tags = set()
query_notify.start(username, id_type)
# TODO: connector
connector = ProxyConnector.from_url(proxy) if proxy else aiohttp.TCPConnector(ssl=False)
# connector = aiohttp.TCPConnector(ssl=False)
connector.verify_ssl=False
session = aiohttp.ClientSession(connector=connector)
if logger.level == logging.DEBUG:
future = session.get(url='https://icanhazip.com')
ip, status, error, expection = await get_response(future, None, 'probe', logger)
if ip:
logger.debug(f'My IP is: {ip.strip()}')
else:
logger.debug(f'IP requesting {error}: {expection}')
# Results from analysis of all sites
results_total = {}
# First create futures for all requests. This allows for the requests to run in parallel
for social_network, net_info in site_data.items():
if net_info.get('type', 'username') != id_type:
continue
site_tags = set(net_info.get('tags', []))
if tags:
if not set(tags).intersection(site_tags):
continue
if 'disabled' in net_info and net_info['disabled'] and not forced:
continue
# Results from analysis of this specific site
results_site = {}
# Record URL of main site
results_site['url_main'] = net_info.get("urlMain")
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 11.1; rv:55.0) Gecko/20100101 Firefox/55.0',
}
if "headers" in net_info:
# Override/append any extra headers required by a given site.
headers.update(net_info["headers"])
# URL of user on site (if it exists)
url = net_info.get('url').format(
urlMain=net_info['urlMain'],
urlSubpath=net_info.get('urlSubpath', ''),
username=username
)
# workaround to prevent slash errors
url = url.replace('///', '/')
# Don't make request if username is invalid for the site
regex_check = net_info.get("regexCheck")
if regex_check and re.search(regex_check, username) is None:
# No need to do the check at the site: this user name is not allowed.
results_site['status'] = QueryResult(username,
social_network,
url,
QueryStatus.ILLEGAL)
results_site["url_user"] = ""
results_site['http_status'] = ""
results_site['response_text'] = ""
query_notify.update(results_site['status'])
else:
# URL of user on site (if it exists)
results_site["url_user"] = url
url_probe = net_info.get("urlProbe")
if url_probe is None:
# Probe URL is normal one seen by people out on the web.
url_probe = url
else:
# There is a special URL for probing existence separate
# from where the user profile normally can be found.
url_probe = url_probe.format(
urlMain=net_info['urlMain'],
urlSubpath=net_info.get('urlSubpath', ''),
username=username,
)
if net_info["errorType"] == 'status_code' and net_info.get("request_head_only", True):
# In most cases when we are detecting by status code,
# it is not necessary to get the entire body: we can
# detect fine with just the HEAD response.
request_method = session.head
else:
# Either this detect method needs the content associated
# with the GET response, or this specific website will
# not respond properly unless we request the whole page.
request_method = session.get
if net_info["errorType"] == "response_url":
# Site forwards request to a different URL if username not
# found. Disallow the redirect so we can capture the
# http status from the original URL request.
allow_redirects = False
else:
# Allow whatever redirect that the site wants to do.
# The final result of the request will be what is available.
allow_redirects = True
# TODO: cookies using
def parse_cookies(cookies_str):
cookies = SimpleCookie()
cookies.load(cookies_str)
return {key: morsel.value for key, morsel in cookies.items()}
if os.path.exists(cookies_file):
cookies_obj = cookielib.MozillaCookieJar(cookies_file)
cookies_obj.load(ignore_discard=True, ignore_expires=True)
else:
cookies_obj = []
future = request_method(url=url_probe, headers=headers,
allow_redirects=allow_redirects,
timeout=timeout,
)
# Store future in data for access later
net_info["request_future"] = future
# Add this site's results into final dictionary with all of the other results.
results_total[social_network] = results_site
# TODO: move into top-level function
sem = asyncio.Semaphore(max_connections)
tasks = []
for social_network, net_info in site_data.items():
future = asyncio.ensure_future(update_site_data_from_response(social_network, site_data, net_info, sem, logger))
tasks.append(future)
await asyncio.gather(*tasks)
await session.close()
# TODO: split to separate functions
for social_network, net_info in site_data.items():
# Retrieve results again
results_site = results_total.get(social_network)
if not results_site:
continue
def process_site_result(response, query_notify, logger, results_info, net_info, social_network):
if not response:
return results_info
# Retrieve other site information again
url = results_site.get("url_user")
username = results_info['username']
is_parsing_enabled = results_info['parsing_enabled']
url = results_info.get("url_user")
logger.debug(url)
status = results_site.get("status")
status = results_info.get("status")
if status is not None:
# We have already determined the user doesn't exist here
continue
return results_info
# Get the expected error type
error_type = net_info["errorType"]
@@ -333,18 +155,17 @@ async def maigret(username, site_data, query_notify, logger,
failure_errors = net_info.get("errors", {})
# TODO: refactor
resp = net_info.get('resp')
if not resp:
if not response:
logger.error(f'No response for {social_network}')
continue
return results_info
html_text, status_code, error_text, expection_text = resp
html_text, status_code, error_text, expection_text = response
site_error_text = '?'
# TODO: add elapsed request time counting
response_time = None
if debug:
if logger.level == logging.DEBUG:
with open('debug.txt', 'a') as f:
status = status_code or 'No response'
f.write(f'url: {url}\nerror: {str(error_text)}\nr: {status}\n')
@@ -426,7 +247,7 @@ async def maigret(username, site_data, query_notify, logger,
extracted_ids_data = {}
if recursive_search and result.status == QueryStatus.CLAIMED:
if is_parsing_enabled and result.status == QueryStatus.CLAIMED:
try:
extracted_ids_data = extract(html_text)
except Exception as e:
@@ -440,7 +261,7 @@ async def maigret(username, site_data, query_notify, logger,
if k in supported_recursive_search_ids:
new_usernames[v] = k
results_site['ids_usernames'] = new_usernames
results_info['ids_usernames'] = new_usernames
result.ids_data = extracted_ids_data
is_similar = net_info.get('similarSearch', False)
@@ -448,17 +269,194 @@ async def maigret(username, site_data, query_notify, logger,
query_notify.update(result, is_similar)
# Save status of request
results_site['status'] = result
results_info['status'] = result
# Save results from request
results_site['http_status'] = status_code
results_site['is_similar'] = is_similar
results_info['http_status'] = status_code
results_info['is_similar'] = is_similar
# results_site['response_text'] = html_text
results_site['rank'] = net_info.get('rank', 0)
results_info['rank'] = net_info.get('rank', 0)
return results_info
async def maigret(username, site_data, query_notify, logger,
proxy=None, timeout=None, recursive_search=False,
id_type='username', tags=None, debug=False, forced=False,
max_connections=100):
"""Main search func
Checks for existence of username on various social media sites.
Keyword Arguments:
username -- String indicating username that report
should be created against.
site_data -- Dictionary containing all of the site data.
query_notify -- Object with base type of QueryNotify().
This will be used to notify the caller about
query results.
proxy -- String indicating the proxy URL
timeout -- Time in seconds to wait before timing out request.
Default is no timeout.
recursive_search -- Search for other usernames in website pages & recursive search by them.
Return Value:
Dictionary containing results from report. Key of dictionary is the name
of the social network site, and the value is another dictionary with
the following keys:
url_main: URL of main site.
url_user: URL of user on site (if account exists).
status: QueryResult() object indicating results of test for
account existence.
http_status: HTTP status code of query which checked for existence on
site.
response_text: Text that came back from request. May be None if
there was an HTTP error when checking for existence.
"""
# Notify caller that we are starting the query.
if tags is None:
tags = set()
query_notify.start(username, id_type)
# TODO: connector
connector = ProxyConnector.from_url(proxy) if proxy else aiohttp.TCPConnector(ssl=False)
# connector = aiohttp.TCPConnector(ssl=False)
connector.verify_ssl=False
session = aiohttp.ClientSession(connector=connector)
if logger.level == logging.DEBUG:
future = session.get(url='https://icanhazip.com')
ip, status, error, expection = await get_response(future, None, logger)
if ip:
logger.debug(f'My IP is: {ip.strip()}')
else:
logger.debug(f'IP requesting {error}: {expection}')
# Results from analysis of all sites
results_total = {}
# First create futures for all requests. This allows for the requests to run in parallel
for social_network, net_info in site_data.items():
if net_info.get('type', 'username') != id_type:
continue
site_tags = set(net_info.get('tags', []))
if tags:
if not set(tags).intersection(site_tags):
continue
if 'disabled' in net_info and net_info['disabled'] and not forced:
continue
# Results from analysis of this specific site
results_site = {}
# Record URL of main site and username
results_site['username'] = username
results_site['parsing_enabled'] = recursive_search
results_site['url_main'] = net_info.get("urlMain")
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 11.1; rv:55.0) Gecko/20100101 Firefox/55.0',
}
if "headers" in net_info:
# Override/append any extra headers required by a given site.
headers.update(net_info["headers"])
# URL of user on site (if it exists)
url = net_info.get('url').format(
urlMain=net_info['urlMain'],
urlSubpath=net_info.get('urlSubpath', ''),
username=username
)
# workaround to prevent slash errors
url = url.replace('///', '/')
# Don't make request if username is invalid for the site
regex_check = net_info.get("regexCheck")
if regex_check and re.search(regex_check, username) is None:
# No need to do the check at the site: this user name is not allowed.
results_site['status'] = QueryResult(username,
social_network,
url,
QueryStatus.ILLEGAL)
results_site["url_user"] = ""
results_site['http_status'] = ""
results_site['response_text'] = ""
query_notify.update(results_site['status'])
else:
# URL of user on site (if it exists)
results_site["url_user"] = url
url_probe = net_info.get("urlProbe")
if url_probe is None:
# Probe URL is normal one seen by people out on the web.
url_probe = url
else:
# There is a special URL for probing existence separate
# from where the user profile normally can be found.
url_probe = url_probe.format(
urlMain=net_info['urlMain'],
urlSubpath=net_info.get('urlSubpath', ''),
username=username,
)
if net_info["errorType"] == 'status_code' and net_info.get("request_head_only", True):
# In most cases when we are detecting by status code,
# it is not necessary to get the entire body: we can
# detect fine with just the HEAD response.
request_method = session.head
else:
# Either this detect method needs the content associated
# with the GET response, or this specific website will
# not respond properly unless we request the whole page.
request_method = session.get
if net_info["errorType"] == "response_url":
# Site forwards request to a different URL if username not
# found. Disallow the redirect so we can capture the
# http status from the original URL request.
allow_redirects = False
else:
# Allow whatever redirect that the site wants to do.
# The final result of the request will be what is available.
allow_redirects = True
# TODO: cookies using
# def parse_cookies(cookies_str):
# cookies = SimpleCookie()
# cookies.load(cookies_str)
# return {key: morsel.value for key, morsel in cookies.items()}
#
# if os.path.exists(cookies_file):
# cookies_obj = cookielib.MozillaCookieJar(cookies_file)
# cookies_obj.load(ignore_discard=True, ignore_expires=True)
future = request_method(url=url_probe, headers=headers,
allow_redirects=allow_redirects,
timeout=timeout,
)
# Store future in data for access later
net_info["request_future"] = future
# Add this site's results into final dictionary with all of the other results.
results_total[social_network] = results_site
# TODO: move into top-level function
sem = asyncio.Semaphore(max_connections)
tasks = []
for sitename, result_obj in results_total.items():
update_site_coro = update_site_data_from_response(sitename, site_data, result_obj, sem, logger, query_notify)
future = asyncio.ensure_future(update_site_coro)
tasks.append(future)
await asyncio.gather(*tasks)
await session.close()
# Notify caller that all queries are finished.
query_notify.finish()
@@ -816,7 +814,7 @@ async def main():
continue
results = await maigret(username,
site_data,
dict(site_data),
query_notify,
proxy=args.proxy,
timeout=args.timeout,
@@ -842,7 +840,9 @@ async def main():
exists_counter = 0
for website_name in results:
dictionary = results[website_name]
# TODO: fix no site data issue
if not dictionary:
continue
new_usernames = dictionary.get('ids_usernames')
if new_usernames:
for u, utype in new_usernames.items():