From 175b8bb0ae511cafa3b04b876014ceb14c28aabd Mon Sep 17 00:00:00 2001 From: Sergey Vartanov Date: Sat, 7 Aug 2021 14:39:37 +0300 Subject: [PATCH] Add web error processing; add boundary box class. --- roentgen.py | 16 +++++--- roentgen/osm_getter.py | 38 +++++++++--------- roentgen/tile.py | 89 +++++++++++++++++++++--------------------- roentgen/ui.py | 18 ++++++--- 4 files changed, 88 insertions(+), 73 deletions(-) diff --git a/roentgen.py b/roentgen.py index a6ec43d..b874fb8 100644 --- a/roentgen.py +++ b/roentgen.py @@ -25,11 +25,11 @@ from roentgen.mapper import ( check_level_number, check_level_overground, ) -from roentgen.osm_getter import get_osm +from roentgen.osm_getter import get_osm, NetworkError from roentgen.osm_reader import Map, OSMReader, OverpassReader from roentgen.point import Point from roentgen.scheme import LineStyle, Scheme -from roentgen.ui import parse_options +from roentgen.ui import parse_options, BoundaryBox from roentgen.util import MinMax @@ -40,7 +40,10 @@ def main(options) -> None: :param argv: command-line arguments """ if options.boundary_box: - options.boundary_box = options.boundary_box.replace(" ", "") + box: List[float] = list( + map(float, options.boundary_box.replace(" ", "").split(",")) + ) + boundary_box = BoundaryBox(box[0], box[1], box[2], box[3]) cache_path: Path = Path(options.cache) cache_path.mkdir(parents=True, exist_ok=True) @@ -50,9 +53,10 @@ def main(options) -> None: if options.input_file_name: input_file_names = list(map(Path, options.input_file_name)) else: - content = get_osm(options.boundary_box, cache_path) - if not content: - logging.fatal("Cannot download OSM data.") + try: + get_osm(boundary_box, cache_path) + except NetworkError as e: + logging.fatal(e.message) sys.exit(1) input_file_names = [cache_path / f"{options.boundary_box}.osm"] diff --git a/roentgen/osm_getter.py b/roentgen/osm_getter.py index 2189104..c737303 100644 --- a/roentgen/osm_getter.py +++ b/roentgen/osm_getter.py @@ -9,15 +9,21 @@ from typing import Dict, Optional import logging import urllib3 +from roentgen.ui import BoundaryBox + __author__ = "Sergey Vartanov" __email__ = "me@enzet.ru" -from roentgen.ui import BoundaryBox + +class NetworkError(Exception): + def __init__(self, message: str): + super().__init__() + self.message: str = message def get_osm( - boundary_box: str, cache_path: Path, to_update: bool = False -) -> Optional[str]: + boundary_box: BoundaryBox, cache_path: Path, to_update: bool = False +) -> str: """ Download OSM data from the web or get if from the cache. @@ -25,7 +31,7 @@ def get_osm( :param cache_path: cache directory to store downloaded OSM files :param to_update: update cache files """ - result_file_name: Path = cache_path / f"{boundary_box}.osm" + result_file_name: Path = cache_path / f"{boundary_box.get_format()}.osm" if not to_update and result_file_name.is_file(): return result_file_name.open().read() @@ -34,20 +40,17 @@ def get_osm( "api.openstreetmap.org/api/0.6/map", {"bbox": boundary_box}, is_secure=True, - ) - if content is None: - return None - if BoundaryBox.from_text(boundary_box) is None: - return None + ).decode("utf-8") - result_file_name.open("w+").write(content.decode("utf-8")) + with result_file_name.open("w+") as output_file: + output_file.write(content) - return content.decode("utf-8") + return content def get_data( address: str, parameters: Dict[str, str], is_secure: bool = False -) -> Optional[bytes]: +) -> bytes: """ Construct Internet page URL and get its descriptor. @@ -56,18 +59,17 @@ def get_data( :param is_secure: https or http :return: connection descriptor """ - url = "http" + ("s" if is_secure else "") + "://" + address + url: str = "http" + ("s" if is_secure else "") + "://" + address if len(parameters) > 0: - url += "?" + urllib.parse.urlencode(parameters) - print("Getting " + url + "...") - pool_manager = urllib3.PoolManager() + url += f"?{urllib.parse.urlencode(parameters)}" + logging.info(f"Getting {url}...") + pool_manager: urllib3.PoolManager = urllib3.PoolManager() urllib3.disable_warnings() try: result = pool_manager.request("GET", url) except urllib3.exceptions.MaxRetryError: - logging.fatal("Too many attempts.") - return None + raise NetworkError("Cannot download data: too many attempts.") pool_manager.clear() time.sleep(2) diff --git a/roentgen/tile.py b/roentgen/tile.py index 619df4a..efc8e0a 100644 --- a/roentgen/tile.py +++ b/roentgen/tile.py @@ -17,7 +17,7 @@ from roentgen.constructor import Constructor from roentgen.flinger import Flinger from roentgen.icon import ShapeExtractor from roentgen.mapper import Painter -from roentgen.osm_getter import get_osm +from roentgen.osm_getter import get_osm, NetworkError from roentgen.osm_reader import Map, OSMReader from roentgen.raster import rasterize from roentgen.scheme import Scheme @@ -40,10 +40,9 @@ class Tiles: @classmethod def from_boundary_box(cls, boundary_box: BoundaryBox, scale: int): """Create minimal set of tiles that cover boundary box.""" - tiles = [] + tiles: List["Tile"] = [] tile_1 = Tile.from_coordinates(boundary_box.get_left_top(), scale) tile_2 = Tile.from_coordinates(boundary_box.get_right_bottom(), scale) - print(boundary_box.left, boundary_box.right) for x in range(tile_1.x, tile_2.x + 1): for y in range(tile_1.y, tile_2.y + 1): @@ -60,14 +59,16 @@ class Tiles: return cls(tiles, tile_1, tile_2, scale, extended_boundary_box) def draw(self, directory: Path, cache_path: Path) -> None: - """Draw set of tiles.""" - content = get_osm(self.boundary_box.get_format(), cache_path) - if not content: - logging.error("Cannot download OSM data.") - return None + """ + Draw set of tiles. + + :param directory: directory for tiles + :param cache_path: directory for temporary OSM files + """ + get_osm(self.boundary_box, cache_path) map_ = OSMReader().parse_osm_file( - cache_path / (self.boundary_box.get_format() + ".osm") + cache_path / (self.boundary_box.get_format_rounded() + ".osm") ) for tile in self.tiles: file_path: Path = tile.get_file_name(directory) @@ -79,18 +80,22 @@ class Tiles: rasterize(file_path, output_path) def draw_image(self, cache_path: Path) -> None: - """Draw all tiles as one picture.""" + """ + Draw all tiles as one picture. + + :param cache_path: directory for temporary SVG file and OSM files + """ output_path: Path = cache_path / ( - self.boundary_box.get_format() + ".svg" + self.boundary_box.get_format_rounded() + ".svg" ) if not output_path.exists(): - content = get_osm(self.boundary_box.get_format(), cache_path) + content = get_osm(self.boundary_box, cache_path) if not content: logging.error("Cannot download OSM data.") return None - map_ = OSMReader().parse_osm_file( - cache_path / (self.boundary_box.get_format() + ".osm") + map_: Map = OSMReader().parse_osm_file( + cache_path / (self.boundary_box.get_format_rounded() + ".osm") ) lat_2, lon_1 = self.tile_1.get_coordinates() lat_1, lon_2 = Tile( @@ -124,7 +129,9 @@ class Tiles: with output_path.open("w+") as output_file: svg.write(output_file) - png_path: Path = cache_path / (self.boundary_box.get_format() + ".png") + png_path: Path = ( + cache_path / f"{self.boundary_box.get_format_rounded()}.png" + ) if not png_path.exists(): rasterize(output_path, png_path) @@ -173,7 +180,7 @@ class Tile: Tile(self.x + 1, self.y + 1, self.scale).get_coordinates(), ) - def get_extended_boundary_box(self) -> Tuple[np.array, np.array]: + def get_extended_boundary_box(self) -> BoundaryBox: """ Same as get_boundary_box, but with extended boundaries. """ @@ -182,36 +189,27 @@ class Tile: self.x + 1, self.y + 1, self.scale ).get_coordinates() - extended_1: Tuple[float, float] = ( - int(point_1[0] * 1000) / 1000 + 0.002, - int(point_1[1] * 1000) / 1000 - 0.001, - ) - extended_2: Tuple[float, float] = ( - int(point_2[0] * 1000) / 1000 - 0.001, - int(point_2[1] * 1000) / 1000 + 0.002, - ) - return np.array(extended_1), np.array(extended_2) + # FIXME: check negative values - def load_map(self, cache_path: Path) -> Optional[Map]: + lat_2 = int(point_1[0] * 1000) / 1000 + 0.002 + lon_1 = int(point_1[1] * 1000) / 1000 - 0.001 + lat_1 = int(point_2[0] * 1000) / 1000 - 0.001 + lon_2 = int(point_2[1] * 1000) / 1000 + 0.002 + + return BoundaryBox(lon_1, lat_1, lon_2, lat_2) + + def load_map(self, cache_path: Path) -> Map: """ Construct map data from extended boundary box. - :param cache_path: directory to store SVG and PNG tiles + :param cache_path: directory to store OSM data files """ - coordinates_1, coordinates_2 = self.get_extended_boundary_box() - lat1, lon1 = coordinates_1 - lat2, lon2 = coordinates_2 + boundary_box: BoundaryBox = self.get_extended_boundary_box() + get_osm(boundary_box, cache_path) - boundary_box: str = ( - f"{min(lon1, lon2):.3f},{min(lat1, lat2):.3f}," - f"{max(lon1, lon2):.3f},{max(lat1, lat2):.3f}" + return OSMReader().parse_osm_file( + cache_path / f"{boundary_box.format()}.osm" ) - content = get_osm(boundary_box, cache_path) - if not content: - logging.error("Cannot download OSM data.") - return None - - return OSMReader().parse_osm_file(cache_path / (boundary_box + ".osm")) def get_file_name(self, directory_name: Path) -> Path: """ @@ -234,10 +232,10 @@ class Tile: :param directory_name: output directory to storing tiles :param cache_path: directory to store SVG and PNG tiles """ - map_ = self.load_map(cache_path) - if map_ is None: - logging.fatal("No map to draw.") - return + try: + map_: Map = self.load_map(cache_path) + except NetworkError as e: + raise NetworkError(f"Map doesn't loaded. {e.message}") self.draw_for_map(map_, directory_name) @@ -291,7 +289,10 @@ def ui(options) -> None: map(float, options.coordinates.strip().split(",")) ) tile: Tile = Tile.from_coordinates(np.array(coordinates), options.scale) - tile.draw(directory, Path(options.cache)) + try: + tile.draw(directory, Path(options.cache)) + except NetworkError as e: + logging.fatal(e.message) elif options.tile: scale, x, y = map(int, options.tile.split("/")) tile: Tile = Tile(x, y, scale) diff --git a/roentgen/ui.py b/roentgen/ui.py index c631ab9..dae0ad2 100644 --- a/roentgen/ui.py +++ b/roentgen/ui.py @@ -257,15 +257,23 @@ class BoundaryBox: """Get right bottom corner of the boundary box.""" return self.bottom, self.right + def get_format_rounded(self) -> str: + """ + Get text representation of the boundary box: + ,,,. Coordinates are + rounded to three digits after comma. + """ + return ( + f"{self.left - 0.001:.3f},{self.bottom - 0.001:.3f}," + f"{self.right + 0.001:.3f},{self.top + 0.001:.3f}" + ) + def get_format(self) -> str: """ Get text representation of the boundary box: ,,,. Coordinates are rounded to three digits after comma. """ - a = ( - f"{self.left - 0.001:.3f},{self.bottom - 0.001:.3f}," - f"{self.right + 0.001:.3f},{self.top + 0.001:.3f}" + return ( + f"{self.left:.3f},{self.bottom:.3f},{self.right:.3f},{self.top:.3f}" ) - print(self.left, a) - return a