Improved extraction of usernames from links in personal data

This commit is contained in:
Soxoj
2021-02-15 01:36:10 +03:00
parent bb4c5dc67a
commit c0956a0e23
6 changed files with 111 additions and 5 deletions
+2 -1
View File
@@ -97,7 +97,7 @@ async def update_site_dict_from_response(sitename, site_dict, results_info, sema
site_dict[sitename] = process_site_result(response, query_notify, logger, results_info, site_obj) site_dict[sitename] = process_site_result(response, query_notify, logger, results_info, site_obj)
# TODO: move info separate module # TODO: move to separate class
def detect_error_page(html_text, status_code, fail_flags, ignore_403): def detect_error_page(html_text, status_code, fail_flags, ignore_403):
# Detect service restrictions such as a country restriction # Detect service restrictions such as a country restriction
for flag, msg in fail_flags.items(): for flag, msg in fail_flags.items():
@@ -270,6 +270,7 @@ def process_site_result(response, query_notify, logger, results_info, site: Maig
new_usernames[v] = k new_usernames[v] = k
results_info['ids_usernames'] = new_usernames results_info['ids_usernames'] = new_usernames
results_info['ids_links'] = eval(extracted_ids_data.get('links', '[]'))
result.ids_data = extracted_ids_data result.ids_data = extracted_ids_data
# Notify caller about results of query. # Notify caller about results of query.
+7
View File
@@ -325,11 +325,18 @@ async def main():
# TODO: fix no site data issue # TODO: fix no site data issue
if not dictionary: if not dictionary:
continue continue
new_usernames = dictionary.get('ids_usernames') new_usernames = dictionary.get('ids_usernames')
if new_usernames: if new_usernames:
for u, utype in new_usernames.items(): for u, utype in new_usernames.items():
usernames[u] = utype usernames[u] = utype
for url in dictionary.get('ids_links', []):
for s in db.sites:
u = s.detect_username(url)
if u:
usernames[u] = 'username'
# reporting for a one username # reporting for a one username
if args.xmind: if args.xmind:
filename = report_filepath_tpl.format(username=username, postfix='.xmind') filename = report_filepath_tpl.format(username=username, postfix='.xmind')
+34 -2
View File
@@ -2,11 +2,12 @@
"""Maigret Sites Information""" """Maigret Sites Information"""
import copy import copy
import json import json
import re
import sys import sys
import requests import requests
from .utils import CaseConverter from .utils import CaseConverter, URLMatcher
class MaigretEngine: class MaigretEngine:
@@ -21,6 +22,16 @@ class MaigretEngine:
class MaigretSite: class MaigretSite:
NOT_SERIALIZABLE_FIELDS = [
'name',
'engineData',
'requestFuture',
'detectedEngine',
'engineObj',
'stats',
'urlRegexp',
]
def __init__(self, name, information): def __init__(self, name, information):
self.name = name self.name = name
@@ -57,10 +68,29 @@ class MaigretSite:
# We do not know the popularity, so make site go to bottom of list. # We do not know the popularity, so make site go to bottom of list.
self.alexa_rank = sys.maxsize self.alexa_rank = sys.maxsize
self.update_detectors()
def __str__(self): def __str__(self):
return f"{self.name} ({self.url_main})" return f"{self.name} ({self.url_main})"
def update_detectors(self):
if 'url' in self.__dict__:
url = self.url
for group in ['urlMain', 'urlSubpath']:
if group in url:
url = url.replace('{'+group+'}', self.__dict__[CaseConverter.camel_to_snake(group)])
self.url_regexp = URLMatcher.make_profile_url_regexp(url, self.regex_check)
def detect_username(self, url: str) -> str:
if self.url_regexp:
import logging
match_groups = self.url_regexp.match(url)
if match_groups:
return match_groups.groups()[-1].rstrip('/')
return None
@property @property
def json(self): def json(self):
result = {} result = {}
@@ -70,7 +100,7 @@ class MaigretSite:
# strip empty elements # strip empty elements
if v in (False, '', [], {}, None, sys.maxsize, 'username'): if v in (False, '', [], {}, None, sys.maxsize, 'username'):
continue continue
if field in ['name', 'engineData', 'requestFuture', 'detectedEngine', 'engineObj', 'stats']: if field in self.NOT_SERIALIZABLE_FIELDS:
continue continue
result[field] = v result[field] = v
@@ -78,6 +108,7 @@ class MaigretSite:
def update(self, updates: dict) -> MaigretSite: def update(self, updates: dict) -> MaigretSite:
self.__dict__.update(updates) self.__dict__.update(updates)
self.update_detectors()
return self return self
@@ -95,6 +126,7 @@ class MaigretSite:
self.__dict__[field] = v self.__dict__[field] = v
self.engine_obj = engine self.engine_obj = engine
self.update_detectors()
return self return self
+26
View File
@@ -29,3 +29,29 @@ def enrich_link_str(link: str) -> str:
if link.startswith('www.') or (link.startswith('http') and '//' in link): if link.startswith('www.') or (link.startswith('http') and '//' in link):
return f'<a class="auto-link" href="{link}">{link}</a>' return f'<a class="auto-link" href="{link}">{link}</a>'
return link return link
class URLMatcher:
_HTTP_URL_RE_STR = '^https?://(www.)?(.+)$'
HTTP_URL_RE = re.compile(_HTTP_URL_RE_STR)
UNSAFE_SYMBOLS = '.?'
@classmethod
def extract_main_part(self, url: str) -> str:
match = self.HTTP_URL_RE.search(url)
if match and match.group(2):
return match.group(2).rstrip('/')
return ''
@classmethod
def make_profile_url_regexp(self, url: str, username_regexp: str = '') -> re.Pattern:
url_main_part = self.extract_main_part(url)
for c in self.UNSAFE_SYMBOLS:
url_main_part = url_main_part.replace(c, f'\\{c}')
username_regexp = username_regexp or '.+?'
url_regexp = url_main_part.replace('{username}', f'({username_regexp})')
regexp_str = self._HTTP_URL_RE_STR.replace('(.+)', url_regexp)
return re.compile(regexp_str)
+8
View File
@@ -113,6 +113,14 @@ def test_saving_site_error():
assert amperka.strip_engine_data().json['errors'] == {'error1': 'text1'} assert amperka.strip_engine_data().json['errors'] == {'error1': 'text1'}
def test_site_url_detector():
db = MaigretDatabase()
db.load_from_json(EXAMPLE_DB)
assert db.sites[0].url_regexp.pattern == r'^https?://(www.)?forum\.amperka\.ru/members/\?username=(.+?)$'
assert db.sites[0].detect_username('http://forum.amperka.ru/members/?username=test') == 'test'
def test_ranked_sites_dict(): def test_ranked_sites_dict():
db = MaigretDatabase() db = MaigretDatabase()
db.update_site(MaigretSite('3', {'alexaRank': 1000, 'engine': 'ucoz'})) db.update_site(MaigretSite('3', {'alexaRank': 1000, 'engine': 'ucoz'}))
+33 -1
View File
@@ -1,5 +1,7 @@
"""Maigret utils test functions""" """Maigret utils test functions"""
from maigret.utils import CaseConverter, is_country_tag, enrich_link_str import itertools
import re
from maigret.utils import CaseConverter, is_country_tag, enrich_link_str, URLMatcher
def test_case_convert_camel_to_snake(): def test_case_convert_camel_to_snake():
@@ -32,3 +34,33 @@ def test_is_country_tag():
def test_enrich_link_str(): def test_enrich_link_str():
assert enrich_link_str('test') == 'test' assert enrich_link_str('test') == 'test'
assert enrich_link_str(' www.flickr.com/photos/alexaimephotography/') == '<a class="auto-link" href="www.flickr.com/photos/alexaimephotography/">www.flickr.com/photos/alexaimephotography/</a>' assert enrich_link_str(' www.flickr.com/photos/alexaimephotography/') == '<a class="auto-link" href="www.flickr.com/photos/alexaimephotography/">www.flickr.com/photos/alexaimephotography/</a>'
def test_url_extract_main_part():
url_main_part = 'flickr.com/photos/alexaimephotography'
parts = [
['http://', 'https://'],
['www.', ''],
[url_main_part],
['/', ''],
]
url_regexp = re.compile('^https?://(www.)?flickr.com/photos/(.+?)$')
for url_parts in itertools.product(*parts):
url = ''.join(url_parts)
assert URLMatcher.extract_main_part(url) == url_main_part
assert not url_regexp.match(url) is None
def test_url_make_profile_url_regexp():
url_main_part = 'flickr.com/photos/{username}'
parts = [
['http://', 'https://'],
['www.', ''],
[url_main_part],
['/', ''],
]
for url_parts in itertools.product(*parts):
url = ''.join(url_parts)
assert URLMatcher.make_profile_url_regexp(url).pattern == r'^https?://(www.)?flickr\.com/photos/(.+?)$'