mirror of
https://github.com/enzet/map-machine.git
synced 2025-05-23 05:56:28 +02:00
Add web error processing; add boundary box class.
This commit is contained in:
parent
d321f75e21
commit
175b8bb0ae
4 changed files with 88 additions and 73 deletions
16
roentgen.py
16
roentgen.py
|
@ -25,11 +25,11 @@ from roentgen.mapper import (
|
|||
check_level_number,
|
||||
check_level_overground,
|
||||
)
|
||||
from roentgen.osm_getter import get_osm
|
||||
from roentgen.osm_getter import get_osm, NetworkError
|
||||
from roentgen.osm_reader import Map, OSMReader, OverpassReader
|
||||
from roentgen.point import Point
|
||||
from roentgen.scheme import LineStyle, Scheme
|
||||
from roentgen.ui import parse_options
|
||||
from roentgen.ui import parse_options, BoundaryBox
|
||||
from roentgen.util import MinMax
|
||||
|
||||
|
||||
|
@ -40,7 +40,10 @@ def main(options) -> None:
|
|||
:param argv: command-line arguments
|
||||
"""
|
||||
if options.boundary_box:
|
||||
options.boundary_box = options.boundary_box.replace(" ", "")
|
||||
box: List[float] = list(
|
||||
map(float, options.boundary_box.replace(" ", "").split(","))
|
||||
)
|
||||
boundary_box = BoundaryBox(box[0], box[1], box[2], box[3])
|
||||
|
||||
cache_path: Path = Path(options.cache)
|
||||
cache_path.mkdir(parents=True, exist_ok=True)
|
||||
|
@ -50,9 +53,10 @@ def main(options) -> None:
|
|||
if options.input_file_name:
|
||||
input_file_names = list(map(Path, options.input_file_name))
|
||||
else:
|
||||
content = get_osm(options.boundary_box, cache_path)
|
||||
if not content:
|
||||
logging.fatal("Cannot download OSM data.")
|
||||
try:
|
||||
get_osm(boundary_box, cache_path)
|
||||
except NetworkError as e:
|
||||
logging.fatal(e.message)
|
||||
sys.exit(1)
|
||||
input_file_names = [cache_path / f"{options.boundary_box}.osm"]
|
||||
|
||||
|
|
|
@ -9,15 +9,21 @@ from typing import Dict, Optional
|
|||
import logging
|
||||
import urllib3
|
||||
|
||||
from roentgen.ui import BoundaryBox
|
||||
|
||||
__author__ = "Sergey Vartanov"
|
||||
__email__ = "me@enzet.ru"
|
||||
|
||||
from roentgen.ui import BoundaryBox
|
||||
|
||||
class NetworkError(Exception):
|
||||
def __init__(self, message: str):
|
||||
super().__init__()
|
||||
self.message: str = message
|
||||
|
||||
|
||||
def get_osm(
|
||||
boundary_box: str, cache_path: Path, to_update: bool = False
|
||||
) -> Optional[str]:
|
||||
boundary_box: BoundaryBox, cache_path: Path, to_update: bool = False
|
||||
) -> str:
|
||||
"""
|
||||
Download OSM data from the web or get if from the cache.
|
||||
|
||||
|
@ -25,7 +31,7 @@ def get_osm(
|
|||
:param cache_path: cache directory to store downloaded OSM files
|
||||
:param to_update: update cache files
|
||||
"""
|
||||
result_file_name: Path = cache_path / f"{boundary_box}.osm"
|
||||
result_file_name: Path = cache_path / f"{boundary_box.get_format()}.osm"
|
||||
|
||||
if not to_update and result_file_name.is_file():
|
||||
return result_file_name.open().read()
|
||||
|
@ -34,20 +40,17 @@ def get_osm(
|
|||
"api.openstreetmap.org/api/0.6/map",
|
||||
{"bbox": boundary_box},
|
||||
is_secure=True,
|
||||
)
|
||||
if content is None:
|
||||
return None
|
||||
if BoundaryBox.from_text(boundary_box) is None:
|
||||
return None
|
||||
).decode("utf-8")
|
||||
|
||||
result_file_name.open("w+").write(content.decode("utf-8"))
|
||||
with result_file_name.open("w+") as output_file:
|
||||
output_file.write(content)
|
||||
|
||||
return content.decode("utf-8")
|
||||
return content
|
||||
|
||||
|
||||
def get_data(
|
||||
address: str, parameters: Dict[str, str], is_secure: bool = False
|
||||
) -> Optional[bytes]:
|
||||
) -> bytes:
|
||||
"""
|
||||
Construct Internet page URL and get its descriptor.
|
||||
|
||||
|
@ -56,18 +59,17 @@ def get_data(
|
|||
:param is_secure: https or http
|
||||
:return: connection descriptor
|
||||
"""
|
||||
url = "http" + ("s" if is_secure else "") + "://" + address
|
||||
url: str = "http" + ("s" if is_secure else "") + "://" + address
|
||||
if len(parameters) > 0:
|
||||
url += "?" + urllib.parse.urlencode(parameters)
|
||||
print("Getting " + url + "...")
|
||||
pool_manager = urllib3.PoolManager()
|
||||
url += f"?{urllib.parse.urlencode(parameters)}"
|
||||
logging.info(f"Getting {url}...")
|
||||
pool_manager: urllib3.PoolManager = urllib3.PoolManager()
|
||||
urllib3.disable_warnings()
|
||||
|
||||
try:
|
||||
result = pool_manager.request("GET", url)
|
||||
except urllib3.exceptions.MaxRetryError:
|
||||
logging.fatal("Too many attempts.")
|
||||
return None
|
||||
raise NetworkError("Cannot download data: too many attempts.")
|
||||
|
||||
pool_manager.clear()
|
||||
time.sleep(2)
|
||||
|
|
|
@ -17,7 +17,7 @@ from roentgen.constructor import Constructor
|
|||
from roentgen.flinger import Flinger
|
||||
from roentgen.icon import ShapeExtractor
|
||||
from roentgen.mapper import Painter
|
||||
from roentgen.osm_getter import get_osm
|
||||
from roentgen.osm_getter import get_osm, NetworkError
|
||||
from roentgen.osm_reader import Map, OSMReader
|
||||
from roentgen.raster import rasterize
|
||||
from roentgen.scheme import Scheme
|
||||
|
@ -40,10 +40,9 @@ class Tiles:
|
|||
@classmethod
|
||||
def from_boundary_box(cls, boundary_box: BoundaryBox, scale: int):
|
||||
"""Create minimal set of tiles that cover boundary box."""
|
||||
tiles = []
|
||||
tiles: List["Tile"] = []
|
||||
tile_1 = Tile.from_coordinates(boundary_box.get_left_top(), scale)
|
||||
tile_2 = Tile.from_coordinates(boundary_box.get_right_bottom(), scale)
|
||||
print(boundary_box.left, boundary_box.right)
|
||||
|
||||
for x in range(tile_1.x, tile_2.x + 1):
|
||||
for y in range(tile_1.y, tile_2.y + 1):
|
||||
|
@ -60,14 +59,16 @@ class Tiles:
|
|||
return cls(tiles, tile_1, tile_2, scale, extended_boundary_box)
|
||||
|
||||
def draw(self, directory: Path, cache_path: Path) -> None:
|
||||
"""Draw set of tiles."""
|
||||
content = get_osm(self.boundary_box.get_format(), cache_path)
|
||||
if not content:
|
||||
logging.error("Cannot download OSM data.")
|
||||
return None
|
||||
"""
|
||||
Draw set of tiles.
|
||||
|
||||
:param directory: directory for tiles
|
||||
:param cache_path: directory for temporary OSM files
|
||||
"""
|
||||
get_osm(self.boundary_box, cache_path)
|
||||
|
||||
map_ = OSMReader().parse_osm_file(
|
||||
cache_path / (self.boundary_box.get_format() + ".osm")
|
||||
cache_path / (self.boundary_box.get_format_rounded() + ".osm")
|
||||
)
|
||||
for tile in self.tiles:
|
||||
file_path: Path = tile.get_file_name(directory)
|
||||
|
@ -79,18 +80,22 @@ class Tiles:
|
|||
rasterize(file_path, output_path)
|
||||
|
||||
def draw_image(self, cache_path: Path) -> None:
|
||||
"""Draw all tiles as one picture."""
|
||||
"""
|
||||
Draw all tiles as one picture.
|
||||
|
||||
:param cache_path: directory for temporary SVG file and OSM files
|
||||
"""
|
||||
output_path: Path = cache_path / (
|
||||
self.boundary_box.get_format() + ".svg"
|
||||
self.boundary_box.get_format_rounded() + ".svg"
|
||||
)
|
||||
if not output_path.exists():
|
||||
content = get_osm(self.boundary_box.get_format(), cache_path)
|
||||
content = get_osm(self.boundary_box, cache_path)
|
||||
if not content:
|
||||
logging.error("Cannot download OSM data.")
|
||||
return None
|
||||
|
||||
map_ = OSMReader().parse_osm_file(
|
||||
cache_path / (self.boundary_box.get_format() + ".osm")
|
||||
map_: Map = OSMReader().parse_osm_file(
|
||||
cache_path / (self.boundary_box.get_format_rounded() + ".osm")
|
||||
)
|
||||
lat_2, lon_1 = self.tile_1.get_coordinates()
|
||||
lat_1, lon_2 = Tile(
|
||||
|
@ -124,7 +129,9 @@ class Tiles:
|
|||
with output_path.open("w+") as output_file:
|
||||
svg.write(output_file)
|
||||
|
||||
png_path: Path = cache_path / (self.boundary_box.get_format() + ".png")
|
||||
png_path: Path = (
|
||||
cache_path / f"{self.boundary_box.get_format_rounded()}.png"
|
||||
)
|
||||
if not png_path.exists():
|
||||
rasterize(output_path, png_path)
|
||||
|
||||
|
@ -173,7 +180,7 @@ class Tile:
|
|||
Tile(self.x + 1, self.y + 1, self.scale).get_coordinates(),
|
||||
)
|
||||
|
||||
def get_extended_boundary_box(self) -> Tuple[np.array, np.array]:
|
||||
def get_extended_boundary_box(self) -> BoundaryBox:
|
||||
"""
|
||||
Same as get_boundary_box, but with extended boundaries.
|
||||
"""
|
||||
|
@ -182,36 +189,27 @@ class Tile:
|
|||
self.x + 1, self.y + 1, self.scale
|
||||
).get_coordinates()
|
||||
|
||||
extended_1: Tuple[float, float] = (
|
||||
int(point_1[0] * 1000) / 1000 + 0.002,
|
||||
int(point_1[1] * 1000) / 1000 - 0.001,
|
||||
)
|
||||
extended_2: Tuple[float, float] = (
|
||||
int(point_2[0] * 1000) / 1000 - 0.001,
|
||||
int(point_2[1] * 1000) / 1000 + 0.002,
|
||||
)
|
||||
return np.array(extended_1), np.array(extended_2)
|
||||
# FIXME: check negative values
|
||||
|
||||
def load_map(self, cache_path: Path) -> Optional[Map]:
|
||||
lat_2 = int(point_1[0] * 1000) / 1000 + 0.002
|
||||
lon_1 = int(point_1[1] * 1000) / 1000 - 0.001
|
||||
lat_1 = int(point_2[0] * 1000) / 1000 - 0.001
|
||||
lon_2 = int(point_2[1] * 1000) / 1000 + 0.002
|
||||
|
||||
return BoundaryBox(lon_1, lat_1, lon_2, lat_2)
|
||||
|
||||
def load_map(self, cache_path: Path) -> Map:
|
||||
"""
|
||||
Construct map data from extended boundary box.
|
||||
|
||||
:param cache_path: directory to store SVG and PNG tiles
|
||||
:param cache_path: directory to store OSM data files
|
||||
"""
|
||||
coordinates_1, coordinates_2 = self.get_extended_boundary_box()
|
||||
lat1, lon1 = coordinates_1
|
||||
lat2, lon2 = coordinates_2
|
||||
boundary_box: BoundaryBox = self.get_extended_boundary_box()
|
||||
get_osm(boundary_box, cache_path)
|
||||
|
||||
boundary_box: str = (
|
||||
f"{min(lon1, lon2):.3f},{min(lat1, lat2):.3f},"
|
||||
f"{max(lon1, lon2):.3f},{max(lat1, lat2):.3f}"
|
||||
return OSMReader().parse_osm_file(
|
||||
cache_path / f"{boundary_box.format()}.osm"
|
||||
)
|
||||
content = get_osm(boundary_box, cache_path)
|
||||
if not content:
|
||||
logging.error("Cannot download OSM data.")
|
||||
return None
|
||||
|
||||
return OSMReader().parse_osm_file(cache_path / (boundary_box + ".osm"))
|
||||
|
||||
def get_file_name(self, directory_name: Path) -> Path:
|
||||
"""
|
||||
|
@ -234,10 +232,10 @@ class Tile:
|
|||
:param directory_name: output directory to storing tiles
|
||||
:param cache_path: directory to store SVG and PNG tiles
|
||||
"""
|
||||
map_ = self.load_map(cache_path)
|
||||
if map_ is None:
|
||||
logging.fatal("No map to draw.")
|
||||
return
|
||||
try:
|
||||
map_: Map = self.load_map(cache_path)
|
||||
except NetworkError as e:
|
||||
raise NetworkError(f"Map doesn't loaded. {e.message}")
|
||||
|
||||
self.draw_for_map(map_, directory_name)
|
||||
|
||||
|
@ -291,7 +289,10 @@ def ui(options) -> None:
|
|||
map(float, options.coordinates.strip().split(","))
|
||||
)
|
||||
tile: Tile = Tile.from_coordinates(np.array(coordinates), options.scale)
|
||||
try:
|
||||
tile.draw(directory, Path(options.cache))
|
||||
except NetworkError as e:
|
||||
logging.fatal(e.message)
|
||||
elif options.tile:
|
||||
scale, x, y = map(int, options.tile.split("/"))
|
||||
tile: Tile = Tile(x, y, scale)
|
||||
|
|
|
@ -257,15 +257,23 @@ class BoundaryBox:
|
|||
"""Get right bottom corner of the boundary box."""
|
||||
return self.bottom, self.right
|
||||
|
||||
def get_format_rounded(self) -> str:
|
||||
"""
|
||||
Get text representation of the boundary box:
|
||||
<longitude 1>,<latitude 1>,<longitude 2>,<latitude 2>. Coordinates are
|
||||
rounded to three digits after comma.
|
||||
"""
|
||||
return (
|
||||
f"{self.left - 0.001:.3f},{self.bottom - 0.001:.3f},"
|
||||
f"{self.right + 0.001:.3f},{self.top + 0.001:.3f}"
|
||||
)
|
||||
|
||||
def get_format(self) -> str:
|
||||
"""
|
||||
Get text representation of the boundary box:
|
||||
<longitude 1>,<latitude 1>,<longitude 2>,<latitude 2>. Coordinates are
|
||||
rounded to three digits after comma.
|
||||
"""
|
||||
a = (
|
||||
f"{self.left - 0.001:.3f},{self.bottom - 0.001:.3f},"
|
||||
f"{self.right + 0.001:.3f},{self.top + 0.001:.3f}"
|
||||
return (
|
||||
f"{self.left:.3f},{self.bottom:.3f},{self.right:.3f},{self.top:.3f}"
|
||||
)
|
||||
print(self.left, a)
|
||||
return a
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue