Merge pull request #117 from soxoj/retries-refactoring

Introduced `--retries` flag, made thorough refactoring
This commit is contained in:
soxoj
2021-05-01 23:58:28 +03:00
committed by GitHub
18 changed files with 6182 additions and 4943 deletions
+2 -1
View File
@@ -26,6 +26,7 @@ Currently supported more than 2000 sites ([full list](./sites.md)), by default s
* Search by tags (site categories, countries) * Search by tags (site categories, countries)
* Censorship and captcha detection * Censorship and captcha detection
* Very few false positives * Very few false positives
* Failed requests' restarts
## Installation ## Installation
@@ -49,7 +50,7 @@ pip3 install .
git clone https://github.com/soxoj/maigret && cd maigret git clone https://github.com/soxoj/maigret && cd maigret
``` ```
You can use your a free virtual machine, the repo will be automatically cloned: You can use a free virtual machine, the repo will be automatically cloned:
[![Open in Cloud Shell](https://user-images.githubusercontent.com/27065646/92304704-8d146d80-ef80-11ea-8c29-0deaabb1c702.png)](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/soxoj/maigret&tutorial=README.md) [![Run on Repl.it](https://user-images.githubusercontent.com/27065646/92304596-bf719b00-ef7f-11ea-987f-2c1f3c323088.png)](https://repl.it/github/soxoj/maigret) [![Open in Cloud Shell](https://user-images.githubusercontent.com/27065646/92304704-8d146d80-ef80-11ea-8c29-0deaabb1c702.png)](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/soxoj/maigret&tutorial=README.md) [![Run on Repl.it](https://user-images.githubusercontent.com/27065646/92304596-bf719b00-ef7f-11ea-987f-2c1f3c323088.png)](https://repl.it/github/soxoj/maigret)
<a href="https://colab.research.google.com/gist//soxoj/879b51bc3b2f8b695abb054090645000/maigret.ipynb"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab" height="40"></a> <a href="https://colab.research.google.com/gist//soxoj/879b51bc3b2f8b695abb054090645000/maigret.ipynb"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab" height="40"></a>
+1 -1
View File
@@ -1,5 +1,5 @@
#!/bin/sh #!/bin/sh
FILES="maigret wizard.py maigret.py" FILES="maigret wizard.py maigret.py tests"
echo 'black' echo 'black'
black --skip-string-normalization $FILES black --skip-string-normalization $FILES
+2 -2
View File
@@ -1,5 +1,5 @@
#!/bin/sh #!/bin/sh
FILES="maigret wizard.py maigret.py" FILES="maigret wizard.py maigret.py tests"
echo 'syntax errors or undefined names' echo 'syntax errors or undefined names'
flake8 --count --select=E9,F63,F7,F82 --show-source --statistics $FILES flake8 --count --select=E9,F63,F7,F82 --show-source --statistics $FILES
@@ -8,4 +8,4 @@ echo 'warning'
flake8 --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics --ignore=E731,W503 $FILES flake8 --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics --ignore=E731,W503 $FILES
echo 'mypy' echo 'mypy'
mypy ./maigret mypy ./maigret ./wizard.py ./tests
+260 -223
View File
@@ -5,7 +5,7 @@ import re
import ssl import ssl
import sys import sys
import tqdm import tqdm
from typing import Tuple, Optional from typing import Tuple, Optional, Dict, List
import aiohttp import aiohttp
import tqdm.asyncio import tqdm.asyncio
@@ -16,9 +16,14 @@ from socid_extractor import extract
from .activation import ParsingActivator, import_aiohttp_cookies from .activation import ParsingActivator, import_aiohttp_cookies
from . import errors from . import errors
from .errors import CheckError from .errors import CheckError
from .executors import AsyncioSimpleExecutor, AsyncioProgressbarQueueExecutor from .executors import (
AsyncExecutor,
AsyncioSimpleExecutor,
AsyncioProgressbarQueueExecutor,
)
from .result import QueryResult, QueryStatus from .result import QueryResult, QueryStatus
from .sites import MaigretDatabase, MaigretSite from .sites import MaigretDatabase, MaigretSite
from .types import QueryOptions, QueryResultWrapper
from .utils import get_random_user_agent from .utils import get_random_user_agent
@@ -35,12 +40,10 @@ supported_recursive_search_ids = (
unsupported_characters = "#" unsupported_characters = "#"
async def get_response( async def get_response(request_future, logger) -> Tuple[str, int, Optional[CheckError]]:
request_future, site_name, logger
) -> Tuple[str, int, Optional[CheckError]]:
html_text = None html_text = None
status_code = 0 status_code = 0
error: Optional[CheckError] = CheckError("Error") error: Optional[CheckError] = CheckError("Unknown")
try: try:
response = await request_future response = await request_future
@@ -76,32 +79,12 @@ async def get_response(
): ):
error = CheckError("SSL", str(e)) error = CheckError("SSL", str(e))
else: else:
logger.warning(f"Unhandled error while requesting {site_name}: {e}")
logger.debug(e, exc_info=True) logger.debug(e, exc_info=True)
error = CheckError("Error", str(e)) error = CheckError("Unexpected", str(e))
# TODO: return only needed information
return str(html_text), status_code, error return str(html_text), status_code, error
async def update_site_dict_from_response(
sitename, site_dict, results_info, logger, query_notify
):
site_obj = site_dict[sitename]
future = site_obj.request_future
if not future:
# ignore: search by incompatible id type
return
response = await get_response(
request_future=future, site_name=sitename, logger=logger
)
return sitename, process_site_result(
response, query_notify, logger, results_info, site_obj
)
# TODO: move to separate class # TODO: move to separate class
def detect_error_page( def detect_error_page(
html_text, status_code, fail_flags, ignore_403 html_text, status_code, fail_flags, ignore_403
@@ -127,7 +110,7 @@ def detect_error_page(
def process_site_result( def process_site_result(
response, query_notify, logger, results_info, site: MaigretSite response, query_notify, logger, results_info: QueryResultWrapper, site: MaigretSite
): ):
if not response: if not response:
return results_info return results_info
@@ -205,6 +188,17 @@ def process_site_result(
logger.debug(presense_flag) logger.debug(presense_flag)
break break
def build_result(status, **kwargs):
return QueryResult(
username,
site_name,
url,
status,
query_time=response_time,
tags=fulltags,
**kwargs,
)
if check_error: if check_error:
logger.debug(check_error) logger.debug(check_error)
result = QueryResult( result = QueryResult(
@@ -218,53 +212,20 @@ def process_site_result(
tags=fulltags, tags=fulltags,
) )
elif check_type == "message": elif check_type == "message":
absence_flags = site.absence_strs
is_absence_flags_list = isinstance(absence_flags, list)
absence_flags_set = (
set(absence_flags) if is_absence_flags_list else {absence_flags}
)
# Checks if the error message is in the HTML # Checks if the error message is in the HTML
is_absence_detected = any( is_absence_detected = any(
[(absence_flag in html_text) for absence_flag in absence_flags_set] [(absence_flag in html_text) for absence_flag in site.absence_strs]
) )
if not is_absence_detected and is_presense_detected: if not is_absence_detected and is_presense_detected:
result = QueryResult( result = build_result(QueryStatus.CLAIMED)
username,
site_name,
url,
QueryStatus.CLAIMED,
query_time=response_time,
tags=fulltags,
)
else: else:
result = QueryResult( result = build_result(QueryStatus.AVAILABLE)
username,
site_name,
url,
QueryStatus.AVAILABLE,
query_time=response_time,
tags=fulltags,
)
elif check_type == "status_code": elif check_type == "status_code":
# Checks if the status code of the response is 2XX # Checks if the status code of the response is 2XX
if (not status_code >= 300 or status_code < 200) and is_presense_detected: if is_presense_detected and (not status_code >= 300 or status_code < 200):
result = QueryResult( result = build_result(QueryStatus.CLAIMED)
username,
site_name,
url,
QueryStatus.CLAIMED,
query_time=response_time,
tags=fulltags,
)
else: else:
result = QueryResult( result = build_result(QueryStatus.AVAILABLE)
username,
site_name,
url,
QueryStatus.AVAILABLE,
query_time=response_time,
tags=fulltags,
)
elif check_type == "response_url": elif check_type == "response_url":
# For this detection method, we have turned off the redirect. # For this detection method, we have turned off the redirect.
# So, there is no need to check the response URL: it will always # So, there is no need to check the response URL: it will always
@@ -272,23 +233,9 @@ def process_site_result(
# code indicates that the request was successful (i.e. no 404, or # code indicates that the request was successful (i.e. no 404, or
# forward to some odd redirect). # forward to some odd redirect).
if 200 <= status_code < 300 and is_presense_detected: if 200 <= status_code < 300 and is_presense_detected:
result = QueryResult( result = build_result(QueryStatus.CLAIMED)
username,
site_name,
url,
QueryStatus.CLAIMED,
query_time=response_time,
tags=fulltags,
)
else: else:
result = QueryResult( result = build_result(QueryStatus.AVAILABLE)
username,
site_name,
url,
QueryStatus.AVAILABLE,
query_time=response_time,
tags=fulltags,
)
else: else:
# It should be impossible to ever get here... # It should be impossible to ever get here...
raise ValueError( raise ValueError(
@@ -329,9 +276,168 @@ def process_site_result(
return results_info return results_info
def make_site_result(
site: MaigretSite, username: str, options: QueryOptions, logger
) -> QueryResultWrapper:
results_site: QueryResultWrapper = {}
# Record URL of main site and username
results_site["site"] = site
results_site["username"] = username
results_site["parsing_enabled"] = options["parsing"]
results_site["url_main"] = site.url_main
results_site["cookies"] = (
options.get("cookie_jar")
and options["cookie_jar"].filter_cookies(site.url_main)
or None
)
headers = {
"User-Agent": get_random_user_agent(),
}
headers.update(site.headers)
if "url" not in site.__dict__:
logger.error("No URL for site %s", site.name)
# URL of user on site (if it exists)
url = site.url.format(
urlMain=site.url_main, urlSubpath=site.url_subpath, username=username
)
# workaround to prevent slash errors
url = re.sub("(?<!:)/+", "/", url)
session = options['session']
# site check is disabled
if site.disabled and not options['forced']:
logger.debug(f"Site {site.name} is disabled, skipping...")
results_site["status"] = QueryResult(
username,
site.name,
url,
QueryStatus.ILLEGAL,
error=CheckError("Check is disabled"),
)
# current username type could not be applied
elif site.type != options["id_type"]:
results_site["status"] = QueryResult(
username,
site.name,
url,
QueryStatus.ILLEGAL,
error=CheckError('Unsupported identifier type', f'Want "{site.type}"'),
)
# username is not allowed.
elif site.regex_check and re.search(site.regex_check, username) is None:
results_site["status"] = QueryResult(
username,
site.name,
url,
QueryStatus.ILLEGAL,
error=CheckError(
'Unsupported username format', f'Want "{site.regex_check}"'
),
)
results_site["url_user"] = ""
results_site["http_status"] = ""
results_site["response_text"] = ""
# query_notify.update(results_site["status"])
else:
# URL of user on site (if it exists)
results_site["url_user"] = url
url_probe = site.url_probe
if url_probe is None:
# Probe URL is normal one seen by people out on the web.
url_probe = url
else:
# There is a special URL for probing existence separate
# from where the user profile normally can be found.
url_probe = url_probe.format(
urlMain=site.url_main,
urlSubpath=site.url_subpath,
username=username,
)
for k, v in site.get_params.items():
url_probe += f"&{k}={v}"
if site.check_type == "status_code" and site.request_head_only:
# In most cases when we are detecting by status code,
# it is not necessary to get the entire body: we can
# detect fine with just the HEAD response.
request_method = session.head
else:
# Either this detect method needs the content associated
# with the GET response, or this specific website will
# not respond properly unless we request the whole page.
request_method = session.get
if site.check_type == "response_url":
# Site forwards request to a different URL if username not
# found. Disallow the redirect so we can capture the
# http status from the original URL request.
allow_redirects = False
else:
# Allow whatever redirect that the site wants to do.
# The final result of the request will be what is available.
allow_redirects = True
future = request_method(
url=url_probe,
headers=headers,
allow_redirects=allow_redirects,
timeout=options['timeout'],
)
# Store future request object in the results object
results_site["future"] = future
return results_site
async def check_site_for_username(
site, username, options: QueryOptions, logger, query_notify, *args, **kwargs
) -> Tuple[str, QueryResultWrapper]:
default_result = make_site_result(site, username, options, logger)
future = default_result.get("future")
if not future:
return site.name, default_result
response = await get_response(request_future=future, logger=logger)
response_result = process_site_result(
response, query_notify, logger, default_result, site
)
return site.name, response_result
async def debug_ip_request(session, logger):
future = session.get(url="https://icanhazip.com")
ip, status, check_error = await get_response(future, logger)
if ip:
logger.debug(f"My IP is: {ip.strip()}")
else:
logger.debug(f"IP requesting {check_error.type}: {check_error.desc}")
def get_failed_sites(results: Dict[str, QueryResultWrapper]) -> List[str]:
sites = []
for sitename, r in results.items():
status = r.get('status', {})
if status and status.error:
if errors.is_permanent(status.error.type):
continue
sites.append(sitename)
return sites
async def maigret( async def maigret(
username, username: str,
site_dict, site_dict: Dict[str, MaigretSite],
logger, logger,
query_notify=None, query_notify=None,
proxy=None, proxy=None,
@@ -343,14 +449,15 @@ async def maigret(
max_connections=100, max_connections=100,
no_progressbar=False, no_progressbar=False,
cookies=None, cookies=None,
): retries=0,
) -> QueryResultWrapper:
"""Main search func """Main search func
Checks for existence of username on certain sites. Checks for existence of username on certain sites.
Keyword Arguments: Keyword Arguments:
username -- Username string will be used for search. username -- Username string will be used for search.
site_dict -- Dictionary containing sites data. site_dict -- Dictionary containing sites data in MaigretSite objects.
query_notify -- Object with base type of QueryNotify(). query_notify -- Object with base type of QueryNotify().
This will be used to notify the caller about This will be used to notify the caller about
query results. query results.
@@ -380,17 +487,16 @@ async def maigret(
there was an HTTP error when checking for existence. there was an HTTP error when checking for existence.
""" """
# Notify caller that we are starting the query. # notify caller that we are starting the query.
if not query_notify: if not query_notify:
query_notify = Mock() query_notify = Mock()
query_notify.start(username, id_type) query_notify.start(username, id_type)
# TODO: connector # make http client session
connector = ( connector = (
ProxyConnector.from_url(proxy) if proxy else aiohttp.TCPConnector(ssl=False) ProxyConnector.from_url(proxy) if proxy else aiohttp.TCPConnector(ssl=False)
) )
# connector = aiohttp.TCPConnector(ssl=False)
connector.verify_ssl = False connector.verify_ssl = False
cookie_jar = None cookie_jar = None
@@ -403,126 +509,10 @@ async def maigret(
) )
if logger.level == logging.DEBUG: if logger.level == logging.DEBUG:
future = session.get(url="https://icanhazip.com") await debug_ip_request(session, logger)
ip, status, check_error = await get_response(future, None, logger)
if ip:
logger.debug(f"My IP is: {ip.strip()}")
else:
logger.debug(f"IP requesting {check_error[0]}: {check_error[1]}")
# Results from analysis of all sites
results_total = {}
# First create futures for all requests. This allows for the requests to run in parallel
for site_name, site in site_dict.items():
if site.type != id_type:
continue
if site.disabled and not forced:
logger.debug(f"Site {site.name} is disabled, skipping...")
continue
# Results from analysis of this specific site
results_site = {}
# Record URL of main site and username
results_site["username"] = username
results_site["parsing_enabled"] = is_parsing_enabled
results_site["url_main"] = site.url_main
results_site["cookies"] = (
cookie_jar and cookie_jar.filter_cookies(site.url_main) or None
)
headers = {
"User-Agent": get_random_user_agent(),
}
headers.update(site.headers)
if "url" not in site.__dict__:
logger.error("No URL for site %s", site.name)
# URL of user on site (if it exists)
url = site.url.format(
urlMain=site.url_main, urlSubpath=site.url_subpath, username=username
)
# workaround to prevent slash errors
url = re.sub("(?<!:)/+", "/", url)
# Don't make request if username is invalid for the site
if site.regex_check and re.search(site.regex_check, username) is None:
# No need to do the check at the site: this user name is not allowed.
results_site["status"] = QueryResult(
username, site_name, url, QueryStatus.ILLEGAL
)
results_site["url_user"] = ""
results_site["http_status"] = ""
results_site["response_text"] = ""
query_notify.update(results_site["status"])
else:
# URL of user on site (if it exists)
results_site["url_user"] = url
url_probe = site.url_probe
if url_probe is None:
# Probe URL is normal one seen by people out on the web.
url_probe = url
else:
# There is a special URL for probing existence separate
# from where the user profile normally can be found.
url_probe = url_probe.format(
urlMain=site.url_main,
urlSubpath=site.url_subpath,
username=username,
)
for k, v in site.get_params.items():
url_probe += f"&{k}={v}"
if site.check_type == "status_code" and site.request_head_only:
# In most cases when we are detecting by status code,
# it is not necessary to get the entire body: we can
# detect fine with just the HEAD response.
request_method = session.head
else:
# Either this detect method needs the content associated
# with the GET response, or this specific website will
# not respond properly unless we request the whole page.
request_method = session.get
if site.check_type == "response_url":
# Site forwards request to a different URL if username not
# found. Disallow the redirect so we can capture the
# http status from the original URL request.
allow_redirects = False
else:
# Allow whatever redirect that the site wants to do.
# The final result of the request will be what is available.
allow_redirects = True
future = request_method(
url=url_probe,
headers=headers,
allow_redirects=allow_redirects,
timeout=timeout,
)
# Store future in data for access later
# TODO: move to separate obj
site.request_future = future
# Add this site's results into final dictionary with all of the other results.
results_total[site_name] = results_site
coroutines = []
for sitename, result_obj in results_total.items():
coroutines.append(
(
update_site_dict_from_response,
[sitename, site_dict, result_obj, logger, query_notify],
{},
)
)
# setup parallel executor
executor: Optional[AsyncExecutor] = None
if no_progressbar: if no_progressbar:
executor = AsyncioSimpleExecutor(logger=logger) executor = AsyncioSimpleExecutor(logger=logger)
else: else:
@@ -530,24 +520,68 @@ async def maigret(
logger=logger, in_parallel=max_connections, timeout=timeout + 0.5 logger=logger, in_parallel=max_connections, timeout=timeout + 0.5
) )
results = await executor.run(coroutines) # make options objects for all the requests
options: QueryOptions = {}
options["cookies"] = cookie_jar
options["session"] = session
options["parsing"] = is_parsing_enabled
options["timeout"] = timeout
options["id_type"] = id_type
options["forced"] = forced
# results from analysis of all sites
all_results: Dict[str, QueryResultWrapper] = {}
sites = list(site_dict.keys())
attempts = retries + 1
while attempts:
tasks_dict = {}
for sitename, site in site_dict.items():
if sitename not in sites:
continue
default_result: QueryResultWrapper = {
'site': site,
'status': QueryResult(
username,
sitename,
'',
QueryStatus.UNKNOWN,
error=CheckError('Request failed'),
),
}
tasks_dict[sitename] = (
check_site_for_username,
[site, username, options, logger, query_notify],
{'default': (sitename, default_result)},
)
cur_results = await executor.run(tasks_dict.values())
# wait for executor timeout errors
await asyncio.sleep(1)
all_results.update(cur_results)
sites = get_failed_sites(dict(cur_results))
attempts -= 1
if not sites:
break
if attempts:
query_notify.warning(
f'Restarting checks for {len(sites)} sites... ({attempts} attempts left)'
)
# closing http client session
await session.close() await session.close()
# Notify caller that all queries are finished. # notify caller that all queries are finished
query_notify.finish() query_notify.finish()
data = {} return all_results
for result in results:
# TODO: still can be empty
if result:
try:
data[result[0]] = result[1]
except Exception as e:
logger.error(e, exc_info=True)
logger.info(result)
return data
def timeout_check(value): def timeout_check(value):
@@ -575,7 +609,9 @@ def timeout_check(value):
return timeout return timeout
async def site_self_check(site, logger, semaphore, db: MaigretDatabase, silent=False): async def site_self_check(
site: MaigretSite, logger, semaphore, db: MaigretDatabase, silent=False
):
changes = { changes = {
"disabled": False, "disabled": False,
} }
@@ -602,6 +638,7 @@ async def site_self_check(site, logger, semaphore, db: MaigretDatabase, silent=F
id_type=site.type, id_type=site.type,
forced=True, forced=True,
no_progressbar=True, no_progressbar=True,
retries=1,
) )
# don't disable entries with other ids types # don't disable entries with other ids types
+13 -2
View File
@@ -57,6 +57,17 @@ ERRORS_TYPES = {
'Request timeout': 'Try to increase timeout or to switch to another internet service provider', 'Request timeout': 'Try to increase timeout or to switch to another internet service provider',
} }
TEMPORARY_ERRORS_TYPES = [
'Request timeout',
'Unknown',
'Request failed',
'Connecting failure',
'HTTP',
'Proxy',
'Interrupted',
'Connection lost',
]
THRESHOLD = 3 # percent THRESHOLD = 3 # percent
@@ -64,8 +75,8 @@ def is_important(err_data):
return err_data['perc'] >= THRESHOLD return err_data['perc'] >= THRESHOLD
def is_not_permanent(err_data): def is_permanent(err_type):
return True return err_type not in TEMPORARY_ERRORS_TYPES
def detect(text): def detect(text):
+1 -1
View File
@@ -93,7 +93,7 @@ class AsyncioProgressbarQueueExecutor(AsyncExecutor):
try: try:
result = await asyncio.wait_for(query_task, timeout=self.timeout) result = await asyncio.wait_for(query_task, timeout=self.timeout)
except asyncio.TimeoutError: except asyncio.TimeoutError:
result = None result = kwargs.get('default')
self.results.append(result) self.results.append(result)
self.progress.update(1) self.progress.update(1)
+15 -2
View File
@@ -59,7 +59,7 @@ def notify_about_errors(search_results, query_notify):
) )
async def main(): def setup_arguments_parser():
version_string = '\n'.join( version_string = '\n'.join(
[ [
f'%(prog)s {__version__}', f'%(prog)s {__version__}',
@@ -148,6 +148,14 @@ async def main():
"A longer timeout will be more likely to get results from slow sites. " "A longer timeout will be more likely to get results from slow sites. "
"On the other hand, this may cause a long delay to gather all results. ", "On the other hand, this may cause a long delay to gather all results. ",
) )
parser.add_argument(
"--retries",
action="store",
type=int,
metavar='RETRIES',
default=1,
help="Attempts to restart temporary failed requests.",
)
parser.add_argument( parser.add_argument(
"-n", "-n",
"--max-connections", "--max-connections",
@@ -334,8 +342,12 @@ async def main():
help=f"Generate a JSON report of specific type: {', '.join(SUPPORTED_JSON_REPORT_FORMATS)}" help=f"Generate a JSON report of specific type: {', '.join(SUPPORTED_JSON_REPORT_FORMATS)}"
" (one report per username).", " (one report per username).",
) )
return parser
args = parser.parse_args()
async def main():
arg_parser = setup_arguments_parser()
args = arg_parser.parse_args()
# Logging # Logging
log_level = logging.ERROR log_level = logging.ERROR
@@ -528,6 +540,7 @@ async def main():
forced=args.use_disabled_sites, forced=args.use_disabled_sites,
max_connections=args.connections, max_connections=args.connections,
no_progressbar=args.no_progressbar, no_progressbar=args.no_progressbar,
retries=args.retries,
) )
notify_about_errors(results, query_notify) notify_about_errors(results, query_notify)
+4407 -3417
View File
File diff suppressed because it is too large Load Diff
+35 -29
View File
@@ -3,7 +3,7 @@
import copy import copy
import json import json
import sys import sys
from typing import Optional from typing import Optional, List, Dict, Any
import requests import requests
@@ -57,9 +57,10 @@ SUPPORTED_TAGS = [
class MaigretEngine: class MaigretEngine:
site: Dict[str, Any] = {}
def __init__(self, name, data): def __init__(self, name, data):
self.name = name self.name = name
self.site = {}
self.__dict__.update(data) self.__dict__.update(data)
@property @property
@@ -78,35 +79,40 @@ class MaigretSite:
"urlRegexp", "urlRegexp",
] ]
username_claimed = ""
username_unclaimed = ""
url_subpath = ""
url_main = ""
url = ""
disabled = False
similar_search = False
ignore403 = False
tags: List[str] = []
type = "username"
headers: Dict[str, str] = {}
errors: Dict[str, str] = {}
activation: Dict[str, Any] = {}
regex_check = None
url_probe = None
check_type = ""
request_head_only = ""
get_params: Dict[str, Any] = {}
presense_strs: List[str] = []
absence_strs: List[str] = []
stats: Dict[str, Any] = {}
engine = None
engine_data: Dict[str, Any] = {}
engine_obj: Optional["MaigretEngine"] = None
request_future = None
alexa_rank = None
source = None
def __init__(self, name, information): def __init__(self, name, information):
self.name = name self.name = name
self.disabled = False
self.similar_search = False
self.ignore403 = False
self.tags = []
self.type = "username"
self.headers = {}
self.errors = {}
self.activation = {}
self.url_subpath = "" self.url_subpath = ""
self.regex_check = None
self.url_probe = None
self.check_type = ""
self.request_head_only = ""
self.get_params = {}
self.presense_strs = []
self.absence_strs = []
self.stats = {}
self.engine = None
self.engine_data = {}
self.engine_obj = None
self.request_future = None
self.alexa_rank = None
self.source = None
for k, v in information.items(): for k, v in information.items():
self.__dict__[CaseConverter.camel_to_snake(k)] = v self.__dict__[CaseConverter.camel_to_snake(k)] = v
@@ -193,7 +199,7 @@ class MaigretSite:
self.url_regexp = None self.url_regexp = None
self_copy = copy.deepcopy(self) self_copy = copy.deepcopy(self)
engine_data = self_copy.engine_obj.site engine_data = self_copy.engine_obj and self_copy.engine_obj.site or {}
site_data_keys = list(self_copy.__dict__.keys()) site_data_keys = list(self_copy.__dict__.keys())
for k in engine_data.keys(): for k in engine_data.keys():
+8 -2
View File
@@ -1,5 +1,11 @@
from typing import Callable, Any, Tuple from typing import Callable, List, Dict, Tuple, Any
# search query # search query
QueryDraft = Tuple[Callable, Any, Any] QueryDraft = Tuple[Callable, List, Dict]
# options dict
QueryOptions = Dict[str, Any]
# TODO: throw out
QueryResultWrapper = Dict[str, Any]
+1168 -1154
View File
File diff suppressed because it is too large Load Diff
+2 -1
View File
@@ -26,7 +26,8 @@ def get_test_reports_filenames():
def remove_test_reports(): def remove_test_reports():
reports_list = get_test_reports_filenames() reports_list = get_test_reports_filenames()
for f in reports_list: os.remove(f) for f in reports_list:
os.remove(f)
logging.error(f'Removed test reports {reports_list}') logging.error(f'Removed test reports {reports_list}')
+3 -2
View File
@@ -44,8 +44,9 @@ async def test_import_aiohttp_cookies():
url = 'https://httpbin.org/cookies' url = 'https://httpbin.org/cookies'
connector = aiohttp.TCPConnector(ssl=False) connector = aiohttp.TCPConnector(ssl=False)
session = aiohttp.ClientSession(connector=connector, trust_env=True, session = aiohttp.ClientSession(
cookie_jar=cookie_jar) connector=connector, trust_env=True, cookie_jar=cookie_jar
)
response = await session.get(url=url) response = await session.get(url=url)
result = json.loads(await response.content.read()) result = json.loads(await response.content.read())
+9 -3
View File
@@ -2,11 +2,16 @@
import pytest import pytest
import asyncio import asyncio
import logging import logging
from maigret.executors import AsyncioSimpleExecutor, AsyncioProgressbarExecutor, \ from maigret.executors import (
AsyncioProgressbarSemaphoreExecutor, AsyncioProgressbarQueueExecutor AsyncioSimpleExecutor,
AsyncioProgressbarExecutor,
AsyncioProgressbarSemaphoreExecutor,
AsyncioProgressbarQueueExecutor,
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
async def func(n): async def func(n):
await asyncio.sleep(0.1 * (n % 3)) await asyncio.sleep(0.1 * (n % 3))
return n return n
@@ -20,6 +25,7 @@ async def test_simple_asyncio_executor():
assert executor.execution_time > 0.2 assert executor.execution_time > 0.2
assert executor.execution_time < 0.3 assert executor.execution_time < 0.3
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_asyncio_progressbar_executor(): async def test_asyncio_progressbar_executor():
tasks = [(func, [n], {}) for n in range(10)] tasks = [(func, [n], {}) for n in range(10)]
@@ -64,4 +70,4 @@ async def test_asyncio_progressbar_queue_executor():
executor = AsyncioProgressbarQueueExecutor(logger=logger, in_parallel=10) executor = AsyncioProgressbarQueueExecutor(logger=logger, in_parallel=10)
assert await executor.run(tasks) == [0, 3, 6, 9, 1, 4, 7, 2, 5, 8] assert await executor.run(tasks) == [0, 3, 6, 9, 1, 4, 7, 2, 5, 8]
assert executor.execution_time > 0.2 assert executor.execution_time > 0.2
assert executor.execution_time < 0.3 assert executor.execution_time < 0.3
+7 -17
View File
@@ -8,40 +8,30 @@ from maigret.maigret import self_check
from maigret.sites import MaigretDatabase from maigret.sites import MaigretDatabase
EXAMPLE_DB = { EXAMPLE_DB = {
'engines': { 'engines': {},
},
'sites': { 'sites': {
"GooglePlayStore": { "GooglePlayStore": {
"tags": [ "tags": ["global", "us"],
"global",
"us"
],
"disabled": False, "disabled": False,
"checkType": "status_code", "checkType": "status_code",
"alexaRank": 1, "alexaRank": 1,
"url": "https://play.google.com/store/apps/developer?id={username}", "url": "https://play.google.com/store/apps/developer?id={username}",
"urlMain": "https://play.google.com/store", "urlMain": "https://play.google.com/store",
"usernameClaimed": "Facebook_nosuchname", "usernameClaimed": "Facebook_nosuchname",
"usernameUnclaimed": "noonewouldeverusethis7" "usernameUnclaimed": "noonewouldeverusethis7",
}, },
"Reddit": { "Reddit": {
"tags": [ "tags": ["news", "social", "us"],
"news",
"social",
"us"
],
"checkType": "status_code", "checkType": "status_code",
"presenseStrs": [ "presenseStrs": ["totalKarma"],
"totalKarma"
],
"disabled": True, "disabled": True,
"alexaRank": 17, "alexaRank": 17,
"url": "https://www.reddit.com/user/{username}", "url": "https://www.reddit.com/user/{username}",
"urlMain": "https://www.reddit.com/", "urlMain": "https://www.reddit.com/",
"usernameClaimed": "blue", "usernameClaimed": "blue",
"usernameUnclaimed": "noonewouldeverusethis7" "usernameUnclaimed": "noonewouldeverusethis7",
}, },
} },
} }
+202 -67
View File
@@ -7,8 +7,16 @@ from io import StringIO
import xmind import xmind
from jinja2 import Template from jinja2 import Template
from maigret.report import generate_csv_report, generate_txt_report, save_xmind_report, save_html_report, \ from maigret.report import (
save_pdf_report, generate_report_template, generate_report_context, generate_json_report generate_csv_report,
generate_txt_report,
save_xmind_report,
save_html_report,
save_pdf_report,
generate_report_template,
generate_report_context,
generate_json_report,
)
from maigret.result import QueryResult, QueryStatus from maigret.result import QueryResult, QueryStatus
EXAMPLE_RESULTS = { EXAMPLE_RESULTS = {
@@ -17,14 +25,16 @@ EXAMPLE_RESULTS = {
'parsing_enabled': True, 'parsing_enabled': True,
'url_main': 'https://www.github.com/', 'url_main': 'https://www.github.com/',
'url_user': 'https://www.github.com/test', 'url_user': 'https://www.github.com/test',
'status': QueryResult('test', 'status': QueryResult(
'GitHub', 'test',
'https://www.github.com/test', 'GitHub',
QueryStatus.CLAIMED, 'https://www.github.com/test',
tags=['test_tag']), QueryStatus.CLAIMED,
tags=['test_tag'],
),
'http_status': 200, 'http_status': 200,
'is_similar': False, 'is_similar': False,
'rank': 78 'rank': 78,
} }
} }
@@ -33,74 +43,196 @@ BAD_RESULT = QueryResult('', '', '', QueryStatus.AVAILABLE)
GOOD_500PX_RESULT = copy.deepcopy(GOOD_RESULT) GOOD_500PX_RESULT = copy.deepcopy(GOOD_RESULT)
GOOD_500PX_RESULT.tags = ['photo', 'us', 'global'] GOOD_500PX_RESULT.tags = ['photo', 'us', 'global']
GOOD_500PX_RESULT.ids_data = {"uid": "dXJpOm5vZGU6VXNlcjoyNjQwMzQxNQ==", "legacy_id": "26403415", GOOD_500PX_RESULT.ids_data = {
"username": "alexaimephotographycars", "name": "Alex Aim\u00e9", "uid": "dXJpOm5vZGU6VXNlcjoyNjQwMzQxNQ==",
"website": "www.flickr.com/photos/alexaimephotography/", "legacy_id": "26403415",
"facebook_link": " www.instagram.com/street.reality.photography/", "username": "alexaimephotographycars",
"instagram_username": "alexaimephotography", "twitter_username": "Alexaimephotogr"} "name": "Alex Aim\u00e9",
"website": "www.flickr.com/photos/alexaimephotography/",
"facebook_link": " www.instagram.com/street.reality.photography/",
"instagram_username": "alexaimephotography",
"twitter_username": "Alexaimephotogr",
}
GOOD_REDDIT_RESULT = copy.deepcopy(GOOD_RESULT) GOOD_REDDIT_RESULT = copy.deepcopy(GOOD_RESULT)
GOOD_REDDIT_RESULT.tags = ['news', 'us'] GOOD_REDDIT_RESULT.tags = ['news', 'us']
GOOD_REDDIT_RESULT.ids_data = {"reddit_id": "t5_1nytpy", "reddit_username": "alexaimephotography", GOOD_REDDIT_RESULT.ids_data = {
"fullname": "alexaimephotography", "reddit_id": "t5_1nytpy",
"image": "https://styles.redditmedia.com/t5_1nytpy/styles/profileIcon_7vmhdwzd3g931.jpg?width=256&height=256&crop=256:256,smart&frame=1&s=4f355f16b4920844a3f4eacd4237a7bf76b2e97e", "reddit_username": "alexaimephotography",
"is_employee": "False", "is_nsfw": "False", "is_mod": "True", "is_following": "True", "fullname": "alexaimephotography",
"has_user_profile": "True", "hide_from_robots": "False", "image": "https://styles.redditmedia.com/t5_1nytpy/styles/profileIcon_7vmhdwzd3g931.jpg?width=256&height=256&crop=256:256,smart&frame=1&s=4f355f16b4920844a3f4eacd4237a7bf76b2e97e",
"created_at": "2019-07-10 12:20:03", "total_karma": "53959", "post_karma": "52738"} "is_employee": "False",
"is_nsfw": "False",
"is_mod": "True",
"is_following": "True",
"has_user_profile": "True",
"hide_from_robots": "False",
"created_at": "2019-07-10 12:20:03",
"total_karma": "53959",
"post_karma": "52738",
}
GOOD_IG_RESULT = copy.deepcopy(GOOD_RESULT) GOOD_IG_RESULT = copy.deepcopy(GOOD_RESULT)
GOOD_IG_RESULT.tags = ['photo', 'global'] GOOD_IG_RESULT.tags = ['photo', 'global']
GOOD_IG_RESULT.ids_data = {"instagram_username": "alexaimephotography", "fullname": "Alexaimephotography", GOOD_IG_RESULT.ids_data = {
"id": "6828488620", "instagram_username": "alexaimephotography",
"image": "https://scontent-hel3-1.cdninstagram.com/v/t51.2885-19/s320x320/95420076_1169632876707608_8741505804647006208_n.jpg?_nc_ht=scontent-hel3-1.cdninstagram.com&_nc_ohc=jd87OUGsX4MAX_Ym5GX&tp=1&oh=0f42badd68307ba97ec7fb1ef7b4bfd4&oe=601E5E6F", "fullname": "Alexaimephotography",
"bio": "Photographer \nChild of fine street arts", "id": "6828488620",
"external_url": "https://www.flickr.com/photos/alexaimephotography2020/"} "image": "https://scontent-hel3-1.cdninstagram.com/v/t51.2885-19/s320x320/95420076_1169632876707608_8741505804647006208_n.jpg?_nc_ht=scontent-hel3-1.cdninstagram.com&_nc_ohc=jd87OUGsX4MAX_Ym5GX&tp=1&oh=0f42badd68307ba97ec7fb1ef7b4bfd4&oe=601E5E6F",
"bio": "Photographer \nChild of fine street arts",
"external_url": "https://www.flickr.com/photos/alexaimephotography2020/",
}
GOOD_TWITTER_RESULT = copy.deepcopy(GOOD_RESULT) GOOD_TWITTER_RESULT = copy.deepcopy(GOOD_RESULT)
GOOD_TWITTER_RESULT.tags = ['social', 'us'] GOOD_TWITTER_RESULT.tags = ['social', 'us']
TEST = [('alexaimephotographycars', 'username', { TEST = [
'500px': {'username': 'alexaimephotographycars', 'parsing_enabled': True, 'url_main': 'https://500px.com/', (
'url_user': 'https://500px.com/p/alexaimephotographycars', 'alexaimephotographycars',
'ids_usernames': {'alexaimephotographycars': 'username', 'alexaimephotography': 'username', 'username',
'Alexaimephotogr': 'username'}, 'status': GOOD_500PX_RESULT, 'http_status': 200, {
'is_similar': False, 'rank': 2981}, '500px': {
'Reddit': {'username': 'alexaimephotographycars', 'parsing_enabled': True, 'url_main': 'https://www.reddit.com/', 'username': 'alexaimephotographycars',
'url_user': 'https://www.reddit.com/user/alexaimephotographycars', 'status': BAD_RESULT, 'parsing_enabled': True,
'http_status': 404, 'is_similar': False, 'rank': 17}, 'url_main': 'https://500px.com/',
'Twitter': {'username': 'alexaimephotographycars', 'parsing_enabled': True, 'url_main': 'https://www.twitter.com/', 'url_user': 'https://500px.com/p/alexaimephotographycars',
'url_user': 'https://twitter.com/alexaimephotographycars', 'status': BAD_RESULT, 'http_status': 400, 'ids_usernames': {
'is_similar': False, 'rank': 55}, 'alexaimephotographycars': 'username',
'Instagram': {'username': 'alexaimephotographycars', 'parsing_enabled': True, 'alexaimephotography': 'username',
'url_main': 'https://www.instagram.com/', 'Alexaimephotogr': 'username',
'url_user': 'https://www.instagram.com/alexaimephotographycars', 'status': BAD_RESULT, },
'http_status': 404, 'is_similar': False, 'rank': 29}}), ('alexaimephotography', 'username', { 'status': GOOD_500PX_RESULT,
'500px': {'username': 'alexaimephotography', 'parsing_enabled': True, 'url_main': 'https://500px.com/', 'http_status': 200,
'url_user': 'https://500px.com/p/alexaimephotography', 'status': BAD_RESULT, 'http_status': 200, 'is_similar': False,
'is_similar': False, 'rank': 2981}, 'rank': 2981,
'Reddit': {'username': 'alexaimephotography', 'parsing_enabled': True, 'url_main': 'https://www.reddit.com/', },
'url_user': 'https://www.reddit.com/user/alexaimephotography', 'Reddit': {
'ids_usernames': {'alexaimephotography': 'username'}, 'status': GOOD_REDDIT_RESULT, 'http_status': 200, 'username': 'alexaimephotographycars',
'is_similar': False, 'rank': 17}, 'parsing_enabled': True,
'Twitter': {'username': 'alexaimephotography', 'parsing_enabled': True, 'url_main': 'https://www.twitter.com/', 'url_main': 'https://www.reddit.com/',
'url_user': 'https://twitter.com/alexaimephotography', 'status': BAD_RESULT, 'http_status': 400, 'url_user': 'https://www.reddit.com/user/alexaimephotographycars',
'is_similar': False, 'rank': 55}, 'status': BAD_RESULT,
'Instagram': {'username': 'alexaimephotography', 'parsing_enabled': True, 'url_main': 'https://www.instagram.com/', 'http_status': 404,
'url_user': 'https://www.instagram.com/alexaimephotography', 'is_similar': False,
'ids_usernames': {'alexaimephotography': 'username'}, 'status': GOOD_IG_RESULT, 'http_status': 200, 'rank': 17,
'is_similar': False, 'rank': 29}}), ('Alexaimephotogr', 'username', { },
'500px': {'username': 'Alexaimephotogr', 'parsing_enabled': True, 'url_main': 'https://500px.com/', 'Twitter': {
'url_user': 'https://500px.com/p/Alexaimephotogr', 'status': BAD_RESULT, 'http_status': 200, 'username': 'alexaimephotographycars',
'is_similar': False, 'rank': 2981}, 'parsing_enabled': True,
'Reddit': {'username': 'Alexaimephotogr', 'parsing_enabled': True, 'url_main': 'https://www.reddit.com/', 'url_main': 'https://www.twitter.com/',
'url_user': 'https://www.reddit.com/user/Alexaimephotogr', 'status': BAD_RESULT, 'http_status': 404, 'url_user': 'https://twitter.com/alexaimephotographycars',
'is_similar': False, 'rank': 17}, 'status': BAD_RESULT,
'Twitter': {'username': 'Alexaimephotogr', 'parsing_enabled': True, 'url_main': 'https://www.twitter.com/', 'http_status': 400,
'url_user': 'https://twitter.com/Alexaimephotogr', 'status': GOOD_TWITTER_RESULT, 'http_status': 400, 'is_similar': False,
'is_similar': False, 'rank': 55}, 'rank': 55,
'Instagram': {'username': 'Alexaimephotogr', 'parsing_enabled': True, 'url_main': 'https://www.instagram.com/', },
'url_user': 'https://www.instagram.com/Alexaimephotogr', 'status': BAD_RESULT, 'http_status': 404, 'Instagram': {
'is_similar': False, 'rank': 29}})] 'username': 'alexaimephotographycars',
'parsing_enabled': True,
'url_main': 'https://www.instagram.com/',
'url_user': 'https://www.instagram.com/alexaimephotographycars',
'status': BAD_RESULT,
'http_status': 404,
'is_similar': False,
'rank': 29,
},
},
),
(
'alexaimephotography',
'username',
{
'500px': {
'username': 'alexaimephotography',
'parsing_enabled': True,
'url_main': 'https://500px.com/',
'url_user': 'https://500px.com/p/alexaimephotography',
'status': BAD_RESULT,
'http_status': 200,
'is_similar': False,
'rank': 2981,
},
'Reddit': {
'username': 'alexaimephotography',
'parsing_enabled': True,
'url_main': 'https://www.reddit.com/',
'url_user': 'https://www.reddit.com/user/alexaimephotography',
'ids_usernames': {'alexaimephotography': 'username'},
'status': GOOD_REDDIT_RESULT,
'http_status': 200,
'is_similar': False,
'rank': 17,
},
'Twitter': {
'username': 'alexaimephotography',
'parsing_enabled': True,
'url_main': 'https://www.twitter.com/',
'url_user': 'https://twitter.com/alexaimephotography',
'status': BAD_RESULT,
'http_status': 400,
'is_similar': False,
'rank': 55,
},
'Instagram': {
'username': 'alexaimephotography',
'parsing_enabled': True,
'url_main': 'https://www.instagram.com/',
'url_user': 'https://www.instagram.com/alexaimephotography',
'ids_usernames': {'alexaimephotography': 'username'},
'status': GOOD_IG_RESULT,
'http_status': 200,
'is_similar': False,
'rank': 29,
},
},
),
(
'Alexaimephotogr',
'username',
{
'500px': {
'username': 'Alexaimephotogr',
'parsing_enabled': True,
'url_main': 'https://500px.com/',
'url_user': 'https://500px.com/p/Alexaimephotogr',
'status': BAD_RESULT,
'http_status': 200,
'is_similar': False,
'rank': 2981,
},
'Reddit': {
'username': 'Alexaimephotogr',
'parsing_enabled': True,
'url_main': 'https://www.reddit.com/',
'url_user': 'https://www.reddit.com/user/Alexaimephotogr',
'status': BAD_RESULT,
'http_status': 404,
'is_similar': False,
'rank': 17,
},
'Twitter': {
'username': 'Alexaimephotogr',
'parsing_enabled': True,
'url_main': 'https://www.twitter.com/',
'url_user': 'https://twitter.com/Alexaimephotogr',
'status': GOOD_TWITTER_RESULT,
'http_status': 400,
'is_similar': False,
'rank': 55,
},
'Instagram': {
'username': 'Alexaimephotogr',
'parsing_enabled': True,
'url_main': 'https://www.instagram.com/',
'url_user': 'https://www.instagram.com/Alexaimephotogr',
'status': BAD_RESULT,
'http_status': 404,
'is_similar': False,
'rank': 29,
},
},
),
]
SUPPOSED_BRIEF = """Search by username alexaimephotographycars returned 1 accounts. Found target's other IDs: alexaimephotography, Alexaimephotogr. Search by username alexaimephotography returned 2 accounts. Search by username Alexaimephotogr returned 1 accounts. Extended info extracted from 3 accounts.""" SUPPOSED_BRIEF = """Search by username alexaimephotographycars returned 1 accounts. Found target's other IDs: alexaimephotography, Alexaimephotogr. Search by username alexaimephotography returned 2 accounts. Search by username Alexaimephotogr returned 1 accounts. Extended info extracted from 3 accounts."""
@@ -187,7 +319,10 @@ def test_save_xmind_report():
assert data['topic']['topics'][0]['title'] == 'Undefined' assert data['topic']['topics'][0]['title'] == 'Undefined'
assert data['topic']['topics'][1]['title'] == 'test_tag' assert data['topic']['topics'][1]['title'] == 'test_tag'
assert len(data['topic']['topics'][1]['topics']) == 1 assert len(data['topic']['topics'][1]['topics']) == 1
assert data['topic']['topics'][1]['topics'][0]['label'] == 'https://www.github.com/test' assert (
data['topic']['topics'][1]['topics'][0]['label']
== 'https://www.github.com/test'
)
def test_html_report(): def test_html_report():
+14 -12
View File
@@ -10,25 +10,21 @@ EXAMPLE_DB = {
"The specified member cannot be found. Please enter a member's entire name.", "The specified member cannot be found. Please enter a member's entire name.",
], ],
"checkType": "message", "checkType": "message",
"errors": { "errors": {"You must be logged-in to do that.": "Login required"},
"You must be logged-in to do that.": "Login required" "url": "{urlMain}{urlSubpath}/members/?username={username}",
}, },
"url": "{urlMain}{urlSubpath}/members/?username={username}"
}
}, },
}, },
'sites': { 'sites': {
"Amperka": { "Amperka": {
"engine": "XenForo", "engine": "XenForo",
"rank": 121613, "rank": 121613,
"tags": [ "tags": ["ru"],
"ru"
],
"urlMain": "http://forum.amperka.ru", "urlMain": "http://forum.amperka.ru",
"usernameClaimed": "adam", "usernameClaimed": "adam",
"usernameUnclaimed": "noonewouldeverusethis7" "usernameUnclaimed": "noonewouldeverusethis7",
}, },
} },
} }
@@ -116,8 +112,14 @@ def test_site_url_detector():
db = MaigretDatabase() db = MaigretDatabase()
db.load_from_json(EXAMPLE_DB) db.load_from_json(EXAMPLE_DB)
assert db.sites[0].url_regexp.pattern == r'^https?://(www.)?forum\.amperka\.ru/members/\?username=(.+?)$' assert (
assert db.sites[0].detect_username('http://forum.amperka.ru/members/?username=test') == 'test' db.sites[0].url_regexp.pattern
== r'^https?://(www.)?forum\.amperka\.ru/members/\?username=(.+?)$'
)
assert (
db.sites[0].detect_username('http://forum.amperka.ru/members/?username=test')
== 'test'
)
def test_ranked_sites_dict(): def test_ranked_sites_dict():
+33 -7
View File
@@ -2,7 +2,13 @@
import itertools import itertools
import re import re
from maigret.utils import CaseConverter, is_country_tag, enrich_link_str, URLMatcher, get_dict_ascii_tree from maigret.utils import (
CaseConverter,
is_country_tag,
enrich_link_str,
URLMatcher,
get_dict_ascii_tree,
)
def test_case_convert_camel_to_snake(): def test_case_convert_camel_to_snake():
@@ -45,8 +51,10 @@ def test_is_country_tag():
def test_enrich_link_str(): def test_enrich_link_str():
assert enrich_link_str('test') == 'test' assert enrich_link_str('test') == 'test'
assert enrich_link_str( assert (
' www.flickr.com/photos/alexaimephotography/') == '<a class="auto-link" href="www.flickr.com/photos/alexaimephotography/">www.flickr.com/photos/alexaimephotography/</a>' enrich_link_str(' www.flickr.com/photos/alexaimephotography/')
== '<a class="auto-link" href="www.flickr.com/photos/alexaimephotography/">www.flickr.com/photos/alexaimephotography/</a>'
)
def test_url_extract_main_part(): def test_url_extract_main_part():
@@ -78,15 +86,32 @@ def test_url_make_profile_url_regexp():
for url_parts in itertools.product(*parts): for url_parts in itertools.product(*parts):
url = ''.join(url_parts) url = ''.join(url_parts)
assert URLMatcher.make_profile_url_regexp(url).pattern == r'^https?://(www.)?flickr\.com/photos/(.+?)$' assert (
URLMatcher.make_profile_url_regexp(url).pattern
== r'^https?://(www.)?flickr\.com/photos/(.+?)$'
)
def test_get_dict_ascii_tree(): def test_get_dict_ascii_tree():
data = {'uid': 'dXJpOm5vZGU6VXNlcjoyNjQwMzQxNQ==', 'legacy_id': '26403415', 'username': 'alexaimephotographycars', 'name': 'Alex Aimé', 'created_at': '2018-05-04T10:17:01.000+0000', 'image': 'https://drscdn.500px.org/user_avatar/26403415/q%3D85_w%3D300_h%3D300/v2?webp=true&v=2&sig=0235678a4f7b65e007e864033ebfaf5ef6d87fad34f80a8639d985320c20fe3b', 'image_bg': 'https://drscdn.500px.org/user_cover/26403415/q%3D65_m%3D2048/v2?webp=true&v=1&sig=bea411fb158391a4fdad498874ff17088f91257e59dfb376ff67e3a44c3a4201', 'website': 'www.instagram.com/street.reality.photography/', 'facebook_link': ' www.instagram.com/street.reality.photography/', 'instagram_username': 'Street.Reality.Photography', 'twitter_username': 'Alexaimephotogr'} data = {
'uid': 'dXJpOm5vZGU6VXNlcjoyNjQwMzQxNQ==',
'legacy_id': '26403415',
'username': 'alexaimephotographycars',
'name': 'Alex Aimé',
'created_at': '2018-05-04T10:17:01.000+0000',
'image': 'https://drscdn.500px.org/user_avatar/26403415/q%3D85_w%3D300_h%3D300/v2?webp=true&v=2&sig=0235678a4f7b65e007e864033ebfaf5ef6d87fad34f80a8639d985320c20fe3b',
'image_bg': 'https://drscdn.500px.org/user_cover/26403415/q%3D65_m%3D2048/v2?webp=true&v=1&sig=bea411fb158391a4fdad498874ff17088f91257e59dfb376ff67e3a44c3a4201',
'website': 'www.instagram.com/street.reality.photography/',
'facebook_link': ' www.instagram.com/street.reality.photography/',
'instagram_username': 'Street.Reality.Photography',
'twitter_username': 'Alexaimephotogr',
}
ascii_tree = get_dict_ascii_tree(data.items()) ascii_tree = get_dict_ascii_tree(data.items())
assert ascii_tree == """ assert (
ascii_tree
== """
uid: dXJpOm5vZGU6VXNlcjoyNjQwMzQxNQ== uid: dXJpOm5vZGU6VXNlcjoyNjQwMzQxNQ==
legacy_id: 26403415 legacy_id: 26403415
username: alexaimephotographycars username: alexaimephotographycars
@@ -97,4 +122,5 @@ def test_get_dict_ascii_tree():
website: www.instagram.com/street.reality.photography/ website: www.instagram.com/street.reality.photography/
facebook_link: www.instagram.com/street.reality.photography/ facebook_link: www.instagram.com/street.reality.photography/
instagram_username: Street.Reality.Photography instagram_username: Street.Reality.Photography
twitter_username: Alexaimephotogr""" twitter_username: Alexaimephotogr"""
)