diff --git a/maigret/maigret.py b/maigret/maigret.py index 0d8f07c..f7a40b3 100755 --- a/maigret/maigret.py +++ b/maigret/maigret.py @@ -25,7 +25,7 @@ from socid_extractor import parse, extract from .notify import QueryNotifyPrint from .result import QueryResult, QueryStatus -from .sites import SitesInformation +from .sites import MaigretDatabase, MaigretSite import xmind @@ -55,7 +55,7 @@ unsupported_characters = '#' cookies_file = 'cookies.txt' -async def get_response(request_future, social_network, logger): +async def get_response(request_future, site_name, logger): html_text = None status_code = 0 @@ -92,7 +92,7 @@ async def get_response(request_future, social_network, logger): error_text = "Proxy Error" expection_text = str(err) except Exception as err: - logger.warning(f'Unhandled error while requesting {social_network}: {err}') + logger.warning(f'Unhandled error while requesting {site_name}: {err}') logger.debug(err, exc_info=True) error_text = "Some Error" expection_text = str(err) @@ -101,19 +101,19 @@ async def get_response(request_future, social_network, logger): return html_text, status_code, error_text, expection_text -async def update_site_data_from_response(sitename, site_data, results_info, semaphore, logger, query_notify): +async def update_site_dict_from_response(sitename, site_dict, results_info, semaphore, logger, query_notify): async with semaphore: - site_obj = site_data[sitename] - future = site_obj.get('request_future') + site_obj = site_dict[sitename] + future = site_obj.request_future if not future: # ignore: search by incompatible id type return response = await get_response(request_future=future, - social_network=sitename, + site_name=sitename, logger=logger) - site_data[sitename] = process_site_result(response, query_notify, logger, results_info, site_obj, sitename) + site_dict[sitename] = process_site_result(response, query_notify, logger, results_info, site_obj) # TODO: move info separate module @@ -137,13 +137,11 @@ def detect_error_page(html_text, status_code, fail_flags, ignore_403): return None, None -def process_site_result(response, query_notify, logger, results_info, net_info, social_network): +def process_site_result(response, query_notify, logger, results_info, site: MaigretSite): if not response: return results_info - fulltags = [] - if ("tags" in net_info.keys()): - fulltags = net_info["tags"] + fulltags = site.tags # Retrieve other site information again username = results_info['username'] @@ -157,14 +155,14 @@ def process_site_result(response, query_notify, logger, results_info, net_info, return results_info # Get the expected error type - error_type = net_info["errorType"] + error_type = site.check_type # Get the failure messages and comments - failure_errors = net_info.get("errors", {}) + failure_errors = site.errors # TODO: refactor if not response: - logger.error(f'No response for {social_network}') + logger.error(f'No response for {site.name}') return results_info html_text, status_code, error_text, expection_text = response @@ -182,37 +180,37 @@ def process_site_result(response, query_notify, logger, results_info, net_info, if status_code and not error_text: error_text, site_error_text = detect_error_page(html_text, status_code, failure_errors, - 'ignore_403' in net_info) + site.ignore_403) # presense flags # True by default - presense_flags = net_info.get("presenseStrs", []) + presense_flags = site.presense_strs is_presense_detected = html_text and all( [(presense_flag in html_text) for presense_flag in presense_flags]) or not presense_flags if error_text is not None: logger.debug(error_text) result = QueryResult(username, - social_network, + site.name, url, QueryStatus.UNKNOWN, query_time=response_time, context=f'{error_text}: {site_error_text}', tags=fulltags) elif error_type == "message": - absence_flags = net_info.get("errorMsg") + absence_flags = site.absence_strs is_absence_flags_list = isinstance(absence_flags, list) absence_flags_set = set(absence_flags) if is_absence_flags_list else {absence_flags} # Checks if the error message is in the HTML is_absence_detected = any([(absence_flag in html_text) for absence_flag in absence_flags_set]) if not is_absence_detected and is_presense_detected: result = QueryResult(username, - social_network, + site.name, url, QueryStatus.CLAIMED, query_time=response_time, tags=fulltags) else: result = QueryResult(username, - social_network, + site.name, url, QueryStatus.AVAILABLE, query_time=response_time, tags=fulltags) @@ -220,13 +218,13 @@ def process_site_result(response, query_notify, logger, results_info, net_info, # Checks if the status code of the response is 2XX if (not status_code >= 300 or status_code < 200) and is_presense_detected: result = QueryResult(username, - social_network, + site.name, url, QueryStatus.CLAIMED, query_time=response_time, tags=fulltags) else: result = QueryResult(username, - social_network, + site.name, url, QueryStatus.AVAILABLE, query_time=response_time, tags=fulltags) @@ -238,20 +236,20 @@ def process_site_result(response, query_notify, logger, results_info, net_info, # forward to some odd redirect). if 200 <= status_code < 300 and is_presense_detected: result = QueryResult(username, - social_network, + site.name, url, QueryStatus.CLAIMED, query_time=response_time, tags=fulltags) else: result = QueryResult(username, - social_network, + site.name, url, QueryStatus.AVAILABLE, query_time=response_time, tags=fulltags) else: # It should be impossible to ever get here... raise ValueError(f"Unknown Error Type '{error_type}' for " - f"site '{social_network}'") + f"site '{site_name}'") extracted_ids_data = {} @@ -259,7 +257,7 @@ def process_site_result(response, query_notify, logger, results_info, net_info, try: extracted_ids_data = extract(html_text) except Exception as e: - logger.warning(f'Error while parsing {social_network}: {e}', exc_info=True) + logger.warning(f'Error while parsing {site_name}: {e}', exc_info=True) if extracted_ids_data: new_usernames = {} @@ -272,22 +270,21 @@ def process_site_result(response, query_notify, logger, results_info, net_info, results_info['ids_usernames'] = new_usernames result.ids_data = extracted_ids_data - is_similar = net_info.get('similarSearch', False) # Notify caller about results of query. - query_notify.update(result, is_similar) + query_notify.update(result, site.similar_search) # Save status of request results_info['status'] = result # Save results from request results_info['http_status'] = status_code - results_info['is_similar'] = is_similar + results_info['is_similar'] = site.similar_search # results_site['response_text'] = html_text - results_info['rank'] = net_info.get('rank', 0) + results_info['rank'] = site.popularity_rank return results_info -async def maigret(username, site_data, query_notify, logger, +async def maigret(username, site_dict, query_notify, logger, proxy=None, timeout=None, recursive_search=False, id_type='username', tags=None, debug=False, forced=False, max_connections=100): @@ -298,7 +295,7 @@ async def maigret(username, site_data, query_notify, logger, Keyword Arguments: username -- String indicating username that report should be created against. - site_data -- Dictionary containing all of the site data. + site_dict -- Dictionary containing all of the site data. query_notify -- Object with base type of QueryNotify(). This will be used to notify the caller about query results. @@ -345,21 +342,19 @@ async def maigret(username, site_data, query_notify, logger, results_total = {} # First create futures for all requests. This allows for the requests to run in parallel - for social_network, net_info in site_data.items(): + for site_name, site in site_dict.items(): - fulltags = [] - if ("tags" in net_info.keys()): - fulltags = net_info["tags"] + fulltags = site.tags - if net_info.get('type', 'username') != id_type: + if site.type != id_type: continue - site_tags = set(net_info.get('tags', [])) + site_tags = set(fulltags) if tags: if not set(tags).intersection(site_tags): continue - if 'disabled' in net_info and net_info['disabled'] and not forced: + if site.disabled and not forced: continue # Results from analysis of this specific site @@ -368,32 +363,29 @@ async def maigret(username, site_data, query_notify, logger, # Record URL of main site and username results_site['username'] = username results_site['parsing_enabled'] = recursive_search - results_site['url_main'] = net_info.get("urlMain") + results_site['url_main'] = site.url_main headers = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 11.1; rv:55.0) Gecko/20100101 Firefox/55.0', } - if "headers" in net_info: - # Override/append any extra headers required by a given site. - headers.update(net_info["headers"]) + headers.update(site.headers) # URL of user on site (if it exists) - url = net_info.get('url').format( - urlMain=net_info['urlMain'], - urlSubpath=net_info.get('urlSubpath', ''), + url = site.url_username_format.format( + urlMain=site.url_main, + urlSubpath=site.url_subpath, username=username ) # workaround to prevent slash errors url = url.replace('///', '/') # Don't make request if username is invalid for the site - regex_check = net_info.get("regexCheck") - if regex_check and re.search(regex_check, username) is None: + if site.regex_check and re.search(site.regex_check, username) is None: # No need to do the check at the site: this user name is not allowed. results_site['status'] = QueryResult(username, - social_network, + site_name, url, QueryStatus.ILLEGAL) results_site["url_user"] = "" @@ -403,7 +395,7 @@ async def maigret(username, site_data, query_notify, logger, else: # URL of user on site (if it exists) results_site["url_user"] = url - url_probe = net_info.get("urlProbe") + url_probe = site.url_probe if url_probe is None: # Probe URL is normal one seen by people out on the web. url_probe = url @@ -411,13 +403,13 @@ async def maigret(username, site_data, query_notify, logger, # There is a special URL for probing existence separate # from where the user profile normally can be found. url_probe = url_probe.format( - urlMain=net_info['urlMain'], - urlSubpath=net_info.get('urlSubpath', ''), + urlMain=site.url_main, + urlSubpath=site.url_subpath, username=username, ) - if net_info["errorType"] == 'status_code' and net_info.get("request_head_only", True): + if site.check_type == 'status_code' and site.request_head_only: # In most cases when we are detecting by status code, # it is not necessary to get the entire body: we can # detect fine with just the HEAD response. @@ -428,7 +420,7 @@ async def maigret(username, site_data, query_notify, logger, # not respond properly unless we request the whole page. request_method = session.get - if net_info["errorType"] == "response_url": + if site.check_type == "response_url": # Site forwards request to a different URL if username not # found. Disallow the redirect so we can capture the # http status from the original URL request. @@ -454,10 +446,11 @@ async def maigret(username, site_data, query_notify, logger, ) # Store future in data for access later - net_info["request_future"] = future + # TODO: move to separate obj + site.request_future = future # Add this site's results into final dictionary with all of the other results. - results_total[social_network] = results_site + results_total[site_name] = results_site # TODO: move into top-level function @@ -465,7 +458,7 @@ async def maigret(username, site_data, query_notify, logger, tasks = [] for sitename, result_obj in results_total.items(): - update_site_coro = update_site_data_from_response(sitename, site_data, result_obj, sem, logger, query_notify) + update_site_coro = update_site_dict_from_response(sitename, site_dict, result_obj, sem, logger, query_notify) future = asyncio.ensure_future(update_site_coro) tasks.append(future) @@ -553,8 +546,9 @@ async def site_self_check(site_name, site_data, logger): async def self_check(json_file, logger): - data = json.load(open(json_file)) - sites = SitesInformation(json_file) + db = MaigretDatabase() + db.load_from_file(json_file) + sites = db.sites all_sites = {} def disabled_count(data): @@ -825,18 +819,11 @@ async def main(): # Create object with all information about sites we are aware of. try: - sites = SitesInformation(args.json_file) + site_data_all = MaigretDatabase().load_from_file(args.json_file).sites_dict except Exception as error: print(f"ERROR: {error}") sys.exit(1) - # Create original dictionary from SitesInformation() object. - # Eventually, the rest of the code will be updated to use the new object - # directly, but this will glue the two pieces together. - site_data_all = {} - for site in sites: - site_data_all[site.name] = site.information - if args.site_list is None: # Not desired to look at a sub-set of sites site_data = site_data_all @@ -868,7 +855,7 @@ async def main(): site_data[site] = site_dataCpy.get(site) # Database consistency - enabled_count = len(list(filter(lambda x: not x.get('disabled', False), site_data.values()))) + enabled_count = len(list(filter(lambda x: not x.disabled, site_data.values()))) print(f'Sites in database, enabled/total: {enabled_count}/{len(site_data)}') # Create notify object for query results. diff --git a/maigret/resources/data.json b/maigret/resources/data.json index b887339..1c74766 100644 --- a/maigret/resources/data.json +++ b/maigret/resources/data.json @@ -1,6 +1,7 @@ { "engines": { "XenForo": { + "presenseStrs": ["XenForo"], "site": { "errorMsg": [ "The specified member cannot be found. Please enter a member's entire name.", @@ -11,7 +12,7 @@ "errors": { "You must be logged-in to do that.": "Login required" }, - "url": "{urlMain}/members/?username={username}" + "url": "{urlMain}{urlSubpath}/members/?username={username}" } }, "phpBB": { @@ -46,7 +47,7 @@ "The administrator has banned your IP address": "IP ban", "\u0418\u0437\u0432\u0438\u043d\u0438\u0442\u0435, \u0441\u0435\u0440\u0432\u0435\u0440 \u043f\u0435\u0440\u0435\u0433\u0440\u0443\u0436\u0435\u043d. \u041f\u043e\u0436\u0430\u043b\u0443\u0439\u0441\u0442\u0430, \u043f\u043e\u043f\u0440\u043e\u0431\u0443\u0439\u0442\u0435 \u0437\u0430\u0439\u0442\u0438 \u043f\u043e\u0437\u0436\u0435.": "Server is overloaded" }, - "url": "{urlMain}/{urlSubpath}/member.php?username={username}" + "url": "{urlMain}{urlSubpath}/member.php?username={username}" } } }, @@ -6845,7 +6846,7 @@ "ru" ], "urlMain": "https://www.infrance.su/", - "urlSubpath": "forum", + "urlSubpath": "/forum", "username_claimed": "adam", "username_unclaimed": "noonewouldeverusethis7" }, @@ -7987,7 +7988,7 @@ "ru" ], "urlMain": "https://la.mail.ru", - "urlSubpath": "forums", + "urlSubpath": "/forums", "username_claimed": "wizard", "username_unclaimed": "noonewouldeverusethis7" }, @@ -8554,7 +8555,7 @@ "ru" ], "urlMain": "https://minecraftonly.ru", - "urlSubpath": "forum", + "urlSubpath": "/forum", "username_claimed": "adam", "username_unclaimed": "noonewouldeverusethis7" }, @@ -8639,7 +8640,7 @@ "us" ], "urlMain": "https://www.mobile-files.com/", - "urlSubpath": "forum", + "urlSubpath": "/forum", "username_claimed": "adam", "username_unclaimed": "noonewouldeverusethis7" }, @@ -8819,7 +8820,7 @@ "pk" ], "urlMain": "https://www.movie-list.com", - "urlSubpath": "forum", + "urlSubpath": "/forum", "username_claimed": "adam", "username_unclaimed": "noonewouldeverusethis7" }, @@ -8859,7 +8860,7 @@ "us" ], "urlMain": "https://www.mpgh.net/", - "urlSubpath": "forum", + "urlSubpath": "/forum", "username_claimed": "adam", "username_unclaimed": "noonewouldeverusethis7" }, @@ -9873,7 +9874,7 @@ "engine": "vBulletin", "rank": 4840375, "urlMain": "http://p38forum.com", - "urlSubpath": "forums", + "urlSubpath": "/forums", "username_claimed": "red", "username_unclaimed": "noonewouldeverusethis7" }, @@ -10193,7 +10194,7 @@ "ru" ], "urlMain": "https://pw.mail.ru/", - "urlSubpath": "forums", + "urlSubpath": "/forums", "username_claimed": "wizard", "username_unclaimed": "noonewouldeverusethis7" }, @@ -10216,7 +10217,7 @@ "ru" ], "urlMain": "http://pesiq.ru/", - "urlSubpath": "forum", + "urlSubpath": "/forum", "username_claimed": "adam", "username_unclaimed": "noonewouldeverusethis7" }, @@ -11233,7 +11234,7 @@ "music" ], "urlMain": "http://www.rap-royalty.com", - "urlSubpath": "forum", + "urlSubpath": "/forum", "username_claimed": "red", "username_unclaimed": "noonewouldeverusethis7" }, @@ -11365,7 +11366,7 @@ "ru" ], "urlMain": "http://www.redorchestra.ru", - "urlSubpath": "forums", + "urlSubpath": "/forums", "username_claimed": "adam", "username_unclaimed": "noonewouldeverusethis7" }, @@ -11484,7 +11485,7 @@ "ru" ], "urlMain": "https://rev.mail.ru", - "urlSubpath": "forums", + "urlSubpath": "/forums", "username_claimed": "wizard", "username_unclaimed": "noonewouldeverusethis7" }, @@ -11523,7 +11524,7 @@ "ru" ], "urlMain": "https://www.rlocman.ru", - "urlSubpath": "forum", + "urlSubpath": "/forum", "username_claimed": "elnat", "username_unclaimed": "noonewouldeverusethis7" }, @@ -11700,7 +11701,7 @@ "us" ], "urlMain": "https://www.rpgwatch.com", - "urlSubpath": "forums", + "urlSubpath": "/forums", "username_claimed": "blue", "username_unclaimed": "noonewouldeverusethis7" }, @@ -11821,7 +11822,7 @@ "ru" ], "urlMain": "http://www.russian.fi/", - "urlSubpath": "forum", + "urlSubpath": "/forum", "username_claimed": "adam", "username_unclaimed": "noonewouldeverusethis7" }, @@ -12532,7 +12533,7 @@ "ru" ], "urlMain": "https://solaris-club.net", - "urlSubpath": "forum", + "urlSubpath": "/forum", "username_claimed": "adam", "username_unclaimed": "noonewouldeverusethis7" }, @@ -12802,7 +12803,7 @@ "ru" ], "urlMain": "http://statistika.ru", - "urlSubpath": "forum", + "urlSubpath": "/forum", "username_claimed": "hamam", "username_unclaimed": "noonewouldeverusethis7" }, @@ -12900,7 +12901,7 @@ "ru" ], "urlMain": "https://www.stratege.ru", - "urlSubpath": "forums", + "urlSubpath": "/forums", "username_claimed": "blue", "username_unclaimed": "noonewouldeverusethis7" }, @@ -13199,7 +13200,7 @@ "ru" ], "urlMain": "https://tanks.mail.ru", - "urlSubpath": "forum", + "urlSubpath": "/forum", "username_claimed": "red", "username_unclaimed": "noonewouldeverusethis7" }, @@ -13751,7 +13752,7 @@ "in" ], "urlMain": "https://www.trainsim.com/", - "urlSubpath": "vbts", + "urlSubpath": "/vbts", "username_claimed": "adam", "username_unclaimed": "noonewouldeverusethis7" }, @@ -13986,7 +13987,7 @@ "ru" ], "urlMain": "http://tv-games.ru/", - "urlSubpath": "forum", + "urlSubpath": "/forum", "username_claimed": "adam", "username_unclaimed": "noonewouldeverusethis7" }, @@ -14632,7 +14633,7 @@ "ru" ], "urlMain": "https://wf.mail.ru", - "urlSubpath": "forums", + "urlSubpath": "/forums", "username_claimed": "wizard", "username_unclaimed": "noonewouldeverusethis7" }, @@ -14961,7 +14962,7 @@ "us" ], "urlMain": "http://wirednewyork.com/", - "urlSubpath": "forum", + "urlSubpath": "/forum", "username_claimed": "blue", "username_unclaimed": "noonewouldeverusethis7" }, diff --git a/maigret/sites.py b/maigret/sites.py index 41d98cb..3f03535 100644 --- a/maigret/sites.py +++ b/maigret/sites.py @@ -1,8 +1,5 @@ -"""Sherlock Sites Information Module - -This module supports storing information about web sites. -This is the raw data that will be used to search for usernames. -""" +"""Maigret Sites Information""" +from __future__ import annotations import json import operator import sys @@ -10,8 +7,14 @@ import sys import requests -class SiteInformation(): - def __init__(self, name, url_home, url_username_format, popularity_rank, +class MaigretEngine: + def __init__(self, name, *args, **kwargs): + self.name = name + self.__dict__.update(kwargs) + + +class MaigretSite: + def __init__(self, name, url_main, url_username_format, popularity_rank, username_claimed, username_unclaimed, information): """Create Site Information Object. @@ -21,7 +24,7 @@ class SiteInformation(): Keyword Arguments: self -- This object. name -- String which identifies site. - url_home -- String containing URL for home of site. + url_main -- String containing URL for home of site. url_username_format -- String containing URL for Username format on site. NOTE: The string should contain the @@ -55,7 +58,7 @@ class SiteInformation(): """ self.name = name - self.url_home = url_home + self.url_main = url_main self.url_username_format = url_username_format if (popularity_rank is None) or (popularity_rank == 0): @@ -66,105 +69,56 @@ class SiteInformation(): self.username_claimed = username_claimed self.username_unclaimed = username_unclaimed self.information = information + self.disabled = information.get('disabled', False) + self.similar_search = information.get('similarSearch', False) + self.ignore_403 = information.get('ignore_403', False) + self.tags = information.get('tags', []) + + self.type = information.get('type', 'username') + self.headers = information.get('headers', {}) + self.errors = information.get('errors', {}) + self.url_subpath = information.get('urlSubpath', '') + self.regex_check = information.get('regexCheck', None) + self.url_probe = information.get('urlProbe', None) + self.check_type = information.get('errorType', '') + self.request_head_only = information.get('request_head_only', '') + + self.presense_strs = information.get('presenseStrs', []) + self.absence_strs = information.get('errorMsg', []) + self.request_future = None - return def __str__(self): - """Convert Object To String. - - Keyword Arguments: - self -- This object. - - Return Value: - Nicely formatted string to get information about this object. - """ - - return f"{self.name} ({self.url_home})" + return f"{self.name} ({self.url_main})" -class SitesInformation(): - def __init__(self, data_file_path=None): - """Create Sites Information Object. +class MaigretDatabase: + def __init__(self): + self._sites = [] + self._engines = [] - Contains information about all supported web sites. + @property + def sites(self: MaigretDatabase): + return self._sites - Keyword Arguments: - self -- This object. - data_file_path -- String which indicates path to data file. - The file name must end in ".json". + @property + def sites_dict(self): + return {site.name: site for site in self._sites} + - There are 3 possible formats: - * Absolute File Format - For example, "c:/stuff/data.json". - * Relative File Format - The current working directory is used - as the context. - For example, "data.json". - * URL Format - For example, - "https://example.com/data.json", or - "http://example.com/data.json". + @property + def engines(self: MaigretDatabase): + return self._engines - An exception will be thrown if the path - to the data file is not in the expected - format, or if there was any problem loading - the file. - - If this option is not specified, then a - default site list will be used. - - Return Value: - Nothing. - """ - - # Ensure that specified data file has correct extension. - if ".json" != data_file_path[-5:].lower(): - raise FileNotFoundError(f"Incorrect JSON file extension for " - f"data file '{data_file_path}'." - ) - - if (("http://" == data_file_path[:7].lower()) or - ("https://" == data_file_path[:8].lower()) - ): - # Reference is to a URL. - try: - response = requests.get(url=data_file_path) - except Exception as error: - raise FileNotFoundError(f"Problem while attempting to access " - f"data file URL '{data_file_path}': " - f"{str(error)}" - ) - if response.status_code == 200: - try: - site_data = response.json() - except Exception as error: - raise ValueError(f"Problem parsing json contents at " - f"'{data_file_path}': {str(error)}." - ) - else: - raise FileNotFoundError(f"Bad response while accessing " - f"data file URL '{data_file_path}'." - ) - else: - # Reference is to a file. - try: - with open(data_file_path, "r", encoding="utf-8") as file: - try: - data = json.load(file) - site_data = data.get("sites") - engines_data = data.get("engines") - except Exception as error: - raise ValueError(f"Problem parsing json contents at " - f"'{data_file_path}': {str(error)}." - ) - except FileNotFoundError as error: - raise FileNotFoundError(f"Problem while attempting to access " - f"data file '{data_file_path}'." - ) - - self.sites = {} + def load_from_json(self: MaigretDatabase, json_data: dict) -> MaigretDatabase: # Add all of site information from the json file to internal site list. + site_data = json_data.get("sites") + engines_data = json_data.get("engines") + + for engine_name in engines_data: + self._engines.append(MaigretEngine(engine_name, engines_data[engine_name])) + for site_name in site_data: try: site = {} @@ -178,8 +132,7 @@ class SitesInformation(): site.update(site_user_info) - self.sites[site_name] = \ - SiteInformation(site_name, + maigret_site = MaigretSite(site_name, site["urlMain"], site["url"], popularity_rank, @@ -187,15 +140,74 @@ class SitesInformation(): site["username_unclaimed"], site ) + + self._sites.append(maigret_site) except KeyError as error: - raise ValueError(f"Problem parsing json contents at " - f"'{data_file_path}' for site {site_name}: " + raise ValueError(f"Problem parsing json content for site {site_name}: " f"Missing attribute {str(error)}." ) - return + return self - def site_name_list(self, popularity_rank=False): + + def load_from_str(self: MaigretDatabase, db_str: str) -> MaigretDatabase: + try: + data = json.loads(db_str) + except Exception as error: + raise ValueError(f"Problem parsing json contents from str" + f"'{db_str[:50]}'...: {str(error)}." + ) + + return self.load_from_json(data) + + + def load_from_url(self: MaigretDatabase, url: str) -> MaigretDatabase: + is_url_valid = url.startswith('http://') or url.startswith('https://') + + if not is_url_valid: + return False + + try: + response = requests.get(url=url) + except Exception as error: + raise FileNotFoundError(f"Problem while attempting to access " + f"data file URL '{url}': " + f"{str(error)}" + ) + + if response.status_code == 200: + try: + data = response.json() + except Exception as error: + raise ValueError(f"Problem parsing json contents at " + f"'{url}': {str(error)}." + ) + else: + raise FileNotFoundError(f"Bad response while accessing " + f"data file URL '{url}'." + ) + + return self.load_from_json(data) + + + def load_from_file(self: MaigretDatabase, filename: str) -> MaigretDatabase: + try: + with open(filename, 'r', encoding='utf-8') as file: + try: + data = json.load(file) + except Exception as error: + raise ValueError(f"Problem parsing json contents from " + f"file '{filename}': {str(error)}." + ) + except FileNotFoundError as error: + raise FileNotFoundError(f"Problem while attempting to access " + f"data file '{filename}'." + ) + + return self.load_from_json(data) + + + def site_name_list(self: MaigretDatabase, popularity_rank=False): """Get Site Name List. Keyword Arguments: @@ -223,27 +235,3 @@ class SitesInformation(): site_names = sorted([site.name for site in self], key=str.lower) return site_names - - def __iter__(self): - """Iterator For Object. - - Keyword Arguments: - self -- This object. - - Return Value: - Iterator for sites object. - """ - - for site_name in self.sites: - yield self.sites[site_name] - - def __len__(self): - """Length For Object. - - Keyword Arguments: - self -- This object. - - Return Value: - Length of sites object. - """ - return len(self.sites) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_sites.py b/tests/test_sites.py new file mode 100644 index 0000000..e06d0d0 --- /dev/null +++ b/tests/test_sites.py @@ -0,0 +1,49 @@ +"""Maigret Database test functions""" +from maigret.sites import MaigretDatabase + + +def test_load_empty_db_from_str(): + db = MaigretDatabase() + db.load_from_str('{"engines": {}, "sites": {}}') + + assert db.sites == [] + assert db.engines == [] + + +def test_load_valid_db(): + db = MaigretDatabase() + db.load_from_json({ + 'engines': { + "XenForo": { + "presenseStrs": ["XenForo"], + "site": { + "errorMsg": [ + "The specified member cannot be found. Please enter a member's entire name.", + ], + "errorType": "message", + "errors": { + "You must be logged-in to do that.": "Login required" + }, + "url": "{urlMain}{urlSubpath}/members/?username={username}" + } + }, + }, + 'sites': { + "Amperka": { + "engine": "XenForo", + "rank": 121613, + "tags": [ + "ru" + ], + "urlMain": "http://forum.amperka.ru", + "username_claimed": "adam", + "username_unclaimed": "noonewouldeverusethis7" + }, + } + }) + + assert len(db.sites) == 1 + assert len(db.engines) == 1 + + assert db.sites[0].name == 'Amperka' + assert db.engines[0].name == 'XenForo'