1
0
Fork 0

another competing way of listing import candidates

This commit is contained in:
Mateusz Konieczny 2025-01-22 14:59:10 +01:00
parent 915625a6cd
commit 6f4654aaf8
2 changed files with 313 additions and 0 deletions

216
21_list_import_status.py Normal file
View file

@ -0,0 +1,216 @@
import matcher
import rich
import diskcache
import requests
import os
import serializing
import show_data
import shared
import prose
import generate_html
import json
import show_data
graticule_report = __import__("5_generate_graticule_reports")
obtain_atp_data = __import__("2_obtain_atp_data")
config = __import__("0_config")
def skipped_osm_cases():
return [
'https://www.openstreetmap.org/node/12407299418', # https://www.openstreetmap.org/changeset/160095267
]
def count_unique_source_links(atp_code):
source_links = set()
source_atp_filename = config.atp_unpacked_folder() + atp_code + '.geojson'
with open(source_atp_filename) as file:
try:
atp_data = json.load(file)
for entry in atp_data['features']:
tags = entry['properties']
source_links.add(tags['@source_uri'])
except json.decoder.JSONDecodeError:
return 0
return len(source_links)
def main():
area = graticule_report.global_graticule_coverage()
print(area)
known_listings = {}
for atp_code in obtain_atp_data.all_spider_codes_iterator(): # note, data is fetched from matches which may be based on a different version of ATP, with different spiders being broken or some missing/not yet present. But looking at files directly seems to be an overkill for such diagnosis tool.
if atp_code in ['ups_store']:
continue # apparently, they geoblock their website
if '_pl' not in atp_code:
continue
if count_unique_source_links(atp_code) < 5: # <=1 ? TODO
# keep only ones where it is easy to check website of specific POI
# TODO_LOW_PRIORITY check all
continue
filename = "test_oh_output_" + atp_code + ".html"
process_single_dataset(atp_code, area, filename)
known_listings[atp_code] = filename
generate_overview(known_listings)
def generate_overview(known_listings):
location = config.output_folder() + "test_oh_output.html"
with open(location, 'w') as outfile:
output = prose.universal_html_prefix("page_title")
output += '<section id="main_content" class="inner" style="max-width: 32cm">'
output += "<h1>Import possibilities</h1>"
output += "<p>List below </p>"
output += prose.quality_disclaimer()
spider_data = []
for atp_code in obtain_atp_data.all_spider_codes_iterator():
opening_hours = {'type': 'text', 'value': '????'}
if atp_code in known_listings:
opening_hours = {'type': 'link', 'value': {'text': 'list', 'url': known_listings[atp_code]}}
spider_data.append({
'columns': [{'type': 'text', 'value': atp_code}, opening_hours, {'type': 'text', 'value': '????'}],
'display_type': 'normal',
})
html_table = generate_html.generate_html_table(["spider code", "opening_hours", "website"], spider_data)
output += html_table
output += prose.universal_html_suffix()
outfile.write(output)
print(f'saved to {location}')
def process_single_dataset(atp_code, area, filename):
data = []
for lat_anchor in range(area['min_lat'], area['max_lat']):
for lon_anchor in range(area['min_lon'], area['max_lon']):
output = graticule_report.match_output_for_spider_and_graticule(area, atp_code, lat_anchor, lon_anchor)
if os.path.isfile(output):
match_list = serializing.load_list_of_matches_from_csv(output)
for entry in match_list:
if entry.match_distance != None:
if entry.match_distance < config.missing_shop_distance_in_kilometers_for_specific_case(entry.atp_tags):
osm_hours = entry.osm_match_tags.get("opening_hours")
atp_hours = entry.atp_tags.get(config.opening_hours_key())
if entry.osm_link in skipped_osm_cases():
continue
if config.is_bogus_key_worth_mentioning('opening_hours', atp_code) == False:
# this will throw out entries added there recently
# after dataset being used was generated
continue
if is_still_matching(entry) == False:
continue
judgment = extract_opening_hours_import_info(entry)
if judgment['status'] in ['it_is_not_matching']:
pass
#show_conflicting_entry(entry)
data.append(entry_to_presentation_object(entry, judgment))
location = config.output_folder() + filename
with open(location, 'w') as outfile:
output = prose.universal_html_prefix(atp_code + " opening_hours import candidates")
output += '<section id="main_content" class="inner" style="max-width: 32cm">'
output += "<h1>Import possibilities</h1>"
output += "<p>List below </p>"
output += prose.quality_disclaimer()
spiders = []
html_table = generate_html.generate_html_table(header_of_presentation_objects(), data)
output += html_table
output += prose.universal_html_suffix()
outfile.write(output)
print(f"wrote file to {location}")
def header_of_presentation_objects():
return ['ATP link', 'OSM link', 'ATP opening hours', 'OSM opening hours', 'ATP tags', 'OSM tags', 'Mismatch on']
def entry_to_presentation_object(entry, judgment):
row_type = None
if judgment['status'] in ['it_is_not_matching', 'dubious_match']:
row_type = 'error'
elif judgment['status'] in ['no_import_for_this_key']:
row_type = 'normal'
elif judgment['status'] in ['importable']:
row_type = 'success'
else:
raise Exception("Unexpected status " + str(judgment))
return {
'columns': [
{'type': 'link', 'value': {'text': 'ATP map', 'url': shared.link_to_point_in_atp(lat=entry.atp_center['lat'], lon=entry.atp_center['lon'], zoom_level=20)}},
{'type': 'link', 'value': {'text': 'OSM object', 'url': entry.osm_link}},
{'type': 'text', 'value': entry.atp_tags.get(config.opening_hours_key(), "")},
{'type': 'text', 'value': entry.osm_match_tags.get("opening_hours", "")},
{'type': 'text', 'value': show_data.tag_list_to_html(entry.atp_tags)},
{'type': 'text', 'value': show_data.tag_list_to_html(entry.osm_match_tags)},
{'type': 'text', 'value': ", ".join(judgment.get('mismatching_key_list', ""))},
],
'display_type': row_type
}
def is_still_matching(entry):
# TODO_LOW_PRIORITY - stabilize matching code
"""
matching code is still changing often
this can be used to skip cases which were matching when dataset was generated
but are not matching anymore
"""
def package_tags_into_mock(tags):
return {'tags': tags, 'center': {'lat': 0, 'lon': 0}, 'osm_link': 'dummy'}
atp_data = [package_tags_into_mock(entry.atp_tags)]
osm_data = [package_tags_into_mock(entry.osm_match_tags)]
matches = matcher.get_matches(osm_data, atp_data)
return matches[0].match_distance == 0
# TODO add tests
def extract_opening_hours_import_info(entry):
mismatching_key_list = []
if 'brand:wikidata' in entry.osm_match_tags:
if entry.osm_match_tags.get('brand:wikidata') != entry.atp_tags.get('brand:wikidata'):
mismatching_key_list.append('brand:wikidata')
if 'not:brand:wikidata' in entry.osm_match_tags:
if entry.osm_match_tags.get('not:brand:wikidata') == entry.atp_tags.get('brand:wikidata'):
mismatching_key_list.append('not:brand:wikidata')
if 'brand' in entry.osm_match_tags:
if entry.osm_match_tags.get('brand') != entry.atp_tags.get('brand'):
mismatching_key_list.append('brand')
# also if name is missing in OSM
if entry.osm_match_tags.get('name') != entry.atp_tags.get('name'):
mismatching_key_list.append('name')
if len(mismatching_key_list) > 0:
return {'status': 'it_is_not_matching', 'mismatching_key_list': mismatching_key_list}
if 'brand' not in entry.osm_match_tags:
if 'brand:wikidata' not in entry.osm_match_tags:
if 'name' not in entry.osm_match_tags:
return {'status': 'dubious_match', 'mismatching_key_list': ['brand', 'brand:wikidata', 'name']}
if 'brand' not in entry.atp_tags:
return {'status': 'dubious_match', 'mismatching_key_list': ['brand', 'brand:wikidata', 'name']}
elif entry.osm_match_tags['name'] != entry.atp_tags['brand']:
return {'status': 'dubious_match', 'mismatching_key_list': ['brand', 'brand:wikidata', 'name']}
else:
pass
if 'website' in entry.osm_match_tags:
if entry.osm_match_tags.get('website') != entry.atp_tags.get('website'):
return {'status': 'it_is_not_matching', 'mismatching_key_list': ['website']}
else:
return {'status': 'dubious_match', 'mismatching_key_list': ['website']}
if config.opening_hours_key() not in entry.atp_tags:
return {'status': 'no_import_for_this_key'}
if 'opening_hours' in entry.osm_match_tags:
if entry.osm_match_tags.get('opening_hours') != entry.atp_tags.get(config.opening_hours_key()):
return {'status': 'it_is_not_matching', 'mismatching_key_list': ['opening_hours']}
else:
return {'status': 'no_import_for_this_key'}
return {'status': 'importable'}
def show_conflicting_entry(entry):
print()
print("ATP")
rich.print(entry.atp_tags)
print("OSM")
rich.print(entry.osm_match_tags)
print("ATP")
rich.print(entry.atp_tags[config.opening_hours_key()])
rich.print(shared.link_to_point_in_atp(lat=entry.atp_center['lat'], lon=entry.atp_center['lon'], zoom_level=20))
print("OSM")
rich.print(entry.osm_match_tags.get("opening_hours"))
rich.print(entry.osm_link)
main()

97
generate_html.py Normal file
View file

@ -0,0 +1,97 @@
def generate_html_table(headers, data):
"""
Generates an HTML table based on headers and a list of dictionaries.
:param headers: List of column header names.
:param data: List of dictionaries, where each dictionary contains:
- 'columns': A list of dictionaries for each column value, specifying:
- 'type': 'text' or 'link'.
- 'value': The text value or a dictionary {'text': str, 'url': str} for links.
- 'display_type': A string indicating the row formatting ('error', 'normal', 'success').
:return: A string containing the HTML table.
"""
row_styles = {
'error': 'background-color: #ffe6e6; color: #cc0000;',
'normal': 'background-color: #ffffff; color: #000000;',
'success': 'background-color: #e6ffe6; color: #009900;'
}
link_styles = {
'error': 'color: #cc0000; text-decoration: underline;',
'normal': 'color: #000000; text-decoration: underline;',
'success': 'color: #009900; text-decoration: underline;'
}
html = '''
<table border="1" style="border-collapse: collapse; width: 100%;">
<thead>
<tr>
'''
for header in headers:
html += f'<th>{header}</th>'
html += '''
</tr>
</thead>
<tbody>
'''
for row in data:
columns = row['columns']
row_type = row.get('display_type', 'normal')
row_style = row_styles.get(row_type, row_styles['normal'])
link_style = link_styles.get(row_type, link_styles['normal'])
html += f'<tr style="{row_style}">'
for col in columns:
if col['type'] == 'text':
html += f'<td>{col["value"]}</td>'
elif col['type'] == 'link':
link_data = col['value']
html += f'<td><a href="{link_data["url"]}" target="_blank" style="{link_style}">{link_data["text"]}</a></td>'
html += '</tr>'
html += '''
</tbody>
</table>
'''
return html
# Run example only when not imported
if __name__ == "__main__":
headers = ["Column 1", "Column 2", "Column 3", "Column 4"]
data = [
{
'columns': [
{'type': 'text', 'value': 'Row 1'},
{'type': 'link', 'value': {'text': 'Link 1', 'url': 'https://example.com/1'}},
{'type': 'link', 'value': {'text': 'Link 2', 'url': 'https://example.com/2'}},
{'type': 'text', 'value': 'Additional info'}
],
'display_type': 'error'
},
{
'columns': [
{'type': 'text', 'value': 'Row 2'},
{'type': 'link', 'value': {'text': 'Link 3', 'url': 'https://example.com/3'}},
{'type': 'link', 'value': {'text': 'Link 4', 'url': 'https://example.com/4'}},
{'type': 'text', 'value': 'Some details'}
],
'display_type': 'normal'
},
{
'columns': [
{'type': 'text', 'value': 'Row 3'},
{'type': 'link', 'value': {'text': 'Link 5', 'url': 'https://example.com/5'}},
{'type': 'link', 'value': {'text': 'Link 6', 'url': 'https://example.com/6'}},
{'type': 'text', 'value': 'Recommended details'}
],
'display_type': 'success'
}
]
html_table = generate_html_table(headers, data)
output_file = "styled_table_with_entry_types.html"
with open(output_file, "w") as f:
f.write(html_table)
print(f"HTML table generated and saved as '{output_file}'.")