mirror of
https://github.com/enzet/map-machine.git
synced 2025-05-29 17:06:30 +02:00
Issue #69: preliminary support of tile collection.
This commit is contained in:
parent
1b37fa5d38
commit
088fc58870
3 changed files with 219 additions and 40 deletions
|
@ -1,18 +1,18 @@
|
|||
"""
|
||||
Getting OpenStreetMap data from the web.
|
||||
"""
|
||||
import re
|
||||
import time
|
||||
import urllib
|
||||
from pathlib import Path
|
||||
from typing import Dict, Optional
|
||||
|
||||
import logging
|
||||
import urllib3
|
||||
|
||||
__author__ = "Sergey Vartanov"
|
||||
__email__ = "me@enzet.ru"
|
||||
|
||||
from roentgen.ui import BoundaryBox
|
||||
|
||||
|
||||
def get_osm(
|
||||
boundary_box: str, cache_path: Path, to_update: bool = False
|
||||
|
@ -29,40 +29,13 @@ def get_osm(
|
|||
if not to_update and result_file_name.is_file():
|
||||
return result_file_name.open().read()
|
||||
|
||||
matcher = re.match(
|
||||
"(?P<left>[0-9.-]*),(?P<bottom>[0-9.-]*),"
|
||||
+ "(?P<right>[0-9.-]*),(?P<top>[0-9.-]*)",
|
||||
boundary_box,
|
||||
)
|
||||
|
||||
if not matcher:
|
||||
logging.fatal("Invalid boundary box.")
|
||||
return None
|
||||
|
||||
try:
|
||||
left = float(matcher.group("left"))
|
||||
bottom = float(matcher.group("bottom"))
|
||||
right = float(matcher.group("right"))
|
||||
top = float(matcher.group("top"))
|
||||
except ValueError:
|
||||
logging.fatal("Invalid boundary box.")
|
||||
return None
|
||||
|
||||
if left >= right:
|
||||
logging.fatal("Negative horizontal boundary.")
|
||||
return None
|
||||
if bottom >= top:
|
||||
logging.error("Negative vertical boundary.")
|
||||
return None
|
||||
if right - left > 0.5 or top - bottom > 0.5:
|
||||
logging.error("Boundary box is too big.")
|
||||
return None
|
||||
|
||||
content = get_data(
|
||||
"api.openstreetmap.org/api/0.6/map",
|
||||
{"bbox": boundary_box},
|
||||
is_secure=True,
|
||||
)
|
||||
if BoundaryBox.from_text(boundary_box) is None:
|
||||
return None
|
||||
|
||||
result_file_name.open("w+").write(content.decode("utf-8"))
|
||||
|
||||
|
|
138
roentgen/tile.py
138
roentgen/tile.py
|
@ -19,8 +19,114 @@ from roentgen.icon import ShapeExtractor
|
|||
from roentgen.mapper import Painter
|
||||
from roentgen.osm_getter import get_osm
|
||||
from roentgen.osm_reader import Map, OSMReader
|
||||
from roentgen.raster import rasterize
|
||||
from roentgen.scheme import Scheme
|
||||
from roentgen.util import MinMax
|
||||
from roentgen.ui import BoundaryBox
|
||||
|
||||
|
||||
@dataclass
|
||||
class Tiles:
|
||||
"""
|
||||
Collection of tiles.
|
||||
"""
|
||||
|
||||
tiles: List["Tile"]
|
||||
tile_1: "Tile"
|
||||
tile_2: "Tile"
|
||||
scale: int
|
||||
boundary_box: BoundaryBox
|
||||
|
||||
@classmethod
|
||||
def from_boundary_box(cls, boundary_box: BoundaryBox, scale: int):
|
||||
"""Create minimal set of tiles that cover boundary box."""
|
||||
tiles = []
|
||||
tile_1 = Tile.from_coordinates(boundary_box.get_left_top(), scale)
|
||||
tile_2 = Tile.from_coordinates(boundary_box.get_right_bottom(), scale)
|
||||
print(boundary_box.left, boundary_box.right)
|
||||
|
||||
for x in range(tile_1.x, tile_2.x + 1):
|
||||
for y in range(tile_1.y, tile_2.y + 1):
|
||||
tiles.append(Tile(x, y, scale))
|
||||
|
||||
lat_2, lon_1 = tile_1.get_coordinates()
|
||||
lat_1, lon_2 = Tile(tile_2.x + 1, tile_2.y + 1, scale).get_coordinates()
|
||||
assert lon_2 > lon_1
|
||||
assert lat_2 > lat_1
|
||||
|
||||
extended_boundary_box: BoundaryBox = BoundaryBox(
|
||||
lon_1, lat_1, lon_2, lat_2
|
||||
)
|
||||
return cls(tiles, tile_1, tile_2, scale, extended_boundary_box)
|
||||
|
||||
def draw(self, directory: Path, cache_path: Path) -> None:
|
||||
"""Draw set of tiles."""
|
||||
content = get_osm(self.boundary_box.get_format(), cache_path)
|
||||
if not content:
|
||||
logging.error("Cannot download OSM data.")
|
||||
return None
|
||||
|
||||
map_ = OSMReader().parse_osm_file(
|
||||
cache_path / (self.boundary_box.get_format() + ".osm")
|
||||
)
|
||||
for tile in self.tiles:
|
||||
file_path: Path = tile.get_file_name(directory)
|
||||
if not file_path.exists():
|
||||
tile.draw_for_map(map_, directory)
|
||||
|
||||
output_path: Path = file_path.with_suffix(".png")
|
||||
if not output_path.exists():
|
||||
rasterize(file_path, output_path)
|
||||
|
||||
def draw_image(self, cache_path: Path) -> None:
|
||||
"""Draw all tiles as one picture."""
|
||||
output_path: Path = cache_path / (
|
||||
self.boundary_box.get_format() + ".svg"
|
||||
)
|
||||
if not output_path.exists():
|
||||
content = get_osm(self.boundary_box.get_format(), cache_path)
|
||||
if not content:
|
||||
logging.error("Cannot download OSM data.")
|
||||
return None
|
||||
|
||||
map_ = OSMReader().parse_osm_file(
|
||||
cache_path / (self.boundary_box.get_format() + ".osm")
|
||||
)
|
||||
lat_2, lon_1 = self.tile_1.get_coordinates()
|
||||
lat_1, lon_2 = Tile(
|
||||
self.tile_2.x + 1, self.tile_2.y + 1, self.scale
|
||||
).get_coordinates()
|
||||
min_ = np.array((lat_1, lon_1))
|
||||
max_ = np.array((lat_2, lon_2))
|
||||
|
||||
flinger: Flinger = Flinger(MinMax(min_, max_), self.scale)
|
||||
icon_extractor: ShapeExtractor = ShapeExtractor(
|
||||
workspace.ICONS_PATH, workspace.ICONS_CONFIG_PATH
|
||||
)
|
||||
scheme: Scheme = Scheme(workspace.DEFAULT_SCHEME_PATH)
|
||||
constructor: Constructor = Constructor(
|
||||
map_, flinger, scheme, icon_extractor
|
||||
)
|
||||
constructor.construct()
|
||||
|
||||
svg: svgwrite.Drawing = svgwrite.Drawing(
|
||||
str(output_path), size=flinger.size
|
||||
)
|
||||
painter: Painter = Painter(
|
||||
map_=map_,
|
||||
flinger=flinger,
|
||||
svg=svg,
|
||||
icon_extractor=icon_extractor,
|
||||
scheme=scheme,
|
||||
)
|
||||
painter.draw(constructor)
|
||||
|
||||
with output_path.open("w+") as output_file:
|
||||
svg.write(output_file)
|
||||
|
||||
png_path: Path = cache_path / (self.boundary_box.get_format() + ".png")
|
||||
if not png_path.exists():
|
||||
rasterize(output_path, png_path)
|
||||
|
||||
|
||||
@dataclass
|
||||
|
@ -89,6 +195,8 @@ class Tile:
|
|||
def load_map(self, cache_path: Path) -> Optional[Map]:
|
||||
"""
|
||||
Construct map data from extended boundary box.
|
||||
|
||||
:param cache_path: directory to store SVG and PNG tiles
|
||||
"""
|
||||
coordinates_1, coordinates_2 = self.get_extended_boundary_box()
|
||||
lat1, lon1 = coordinates_1
|
||||
|
@ -98,14 +206,14 @@ class Tile:
|
|||
f"{min(lon1, lon2):.3f},{min(lat1, lat2):.3f},"
|
||||
f"{max(lon1, lon2):.3f},{max(lat1, lat2):.3f}"
|
||||
)
|
||||
content = get_osm(boundary_box, Path("cache"))
|
||||
content = get_osm(boundary_box, cache_path)
|
||||
if not content:
|
||||
logging.error("Cannot download OSM data.")
|
||||
return None
|
||||
|
||||
return OSMReader().parse_osm_file(cache_path / (boundary_box + ".osm"))
|
||||
|
||||
def get_map_name(self, directory_name: Path) -> Path:
|
||||
def get_file_name(self, directory_name: Path) -> Path:
|
||||
"""
|
||||
Get tile output SVG file path.
|
||||
"""
|
||||
|
@ -124,13 +232,17 @@ class Tile:
|
|||
Draw tile to SVG file.
|
||||
|
||||
:param directory_name: output directory to storing tiles
|
||||
:param cache_path: directory to store temporary files
|
||||
:param cache_path: directory to store SVG and PNG tiles
|
||||
"""
|
||||
map_ = self.load_map(cache_path)
|
||||
if map_ is None:
|
||||
logging.fatal("No map to draw.")
|
||||
return
|
||||
|
||||
self.draw_for_map(map_, directory_name)
|
||||
|
||||
def draw_for_map(self, map_: Map, directory_name: Path) -> None:
|
||||
"""Draw tile using existing map."""
|
||||
lat1, lon1 = self.get_coordinates()
|
||||
lat2, lon2 = Tile(self.x + 1, self.y + 1, self.scale).get_coordinates()
|
||||
|
||||
|
@ -140,7 +252,7 @@ class Tile:
|
|||
flinger: Flinger = Flinger(MinMax(min_, max_), self.scale)
|
||||
size: np.array = flinger.size
|
||||
|
||||
output_file_name: Path = self.get_map_name(directory_name)
|
||||
output_file_name: Path = self.get_file_name(directory_name)
|
||||
|
||||
svg: svgwrite.Drawing = svgwrite.Drawing(
|
||||
str(output_file_name), size=size
|
||||
|
@ -174,17 +286,25 @@ def ui(options) -> None:
|
|||
"""
|
||||
directory: Path = workspace.get_tile_path()
|
||||
|
||||
tile: Tile
|
||||
if options.coordinates:
|
||||
coordinates: List[float] = list(
|
||||
map(float, options.coordinates.strip().split(","))
|
||||
)
|
||||
tile = Tile.from_coordinates(np.array(coordinates), options.scale)
|
||||
tile: Tile = Tile.from_coordinates(np.array(coordinates), options.scale)
|
||||
tile.draw(directory, Path(options.cache))
|
||||
elif options.tile:
|
||||
scale, x, y = map(int, options.tile.split("/"))
|
||||
tile = Tile(x, y, scale)
|
||||
tile: Tile = Tile(x, y, scale)
|
||||
tile.draw(directory, Path(options.cache))
|
||||
elif options.boundary_box:
|
||||
boundary_box: Optional[BoundaryBox] = BoundaryBox.from_text(
|
||||
options.boundary_box
|
||||
)
|
||||
if boundary_box is None:
|
||||
sys.exit(1)
|
||||
tiles: Tiles = Tiles.from_boundary_box(boundary_box, options.scale)
|
||||
tiles.draw(directory, Path(options.cache))
|
||||
tiles.draw_image(Path(options.cache))
|
||||
else:
|
||||
logging.fatal("Specify either --coordinates, or --tile.")
|
||||
sys.exit(1)
|
||||
|
||||
tile.draw(directory, Path(options.cache))
|
||||
|
|
|
@ -2,11 +2,17 @@
|
|||
Command-line user interface.
|
||||
"""
|
||||
import argparse
|
||||
import re
|
||||
import sys
|
||||
|
||||
__author__ = "Sergey Vartanov"
|
||||
__email__ = "me@enzet.ru"
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
|
||||
import numpy as np
|
||||
|
||||
BOXES: str = " ▏▎▍▌▋▊▉"
|
||||
BOXES_LENGTH: int = len(BOXES)
|
||||
|
||||
|
@ -69,6 +75,13 @@ def add_tile_arguments(tile) -> None:
|
|||
default="cache",
|
||||
metavar="<path>",
|
||||
)
|
||||
tile.add_argument(
|
||||
"-b",
|
||||
"--boundary-box",
|
||||
help="construct the minimum amount of tiles that cover requested "
|
||||
"boundary box",
|
||||
metavar="<lon1>,<lat1>,<lon2>,<lat2>",
|
||||
)
|
||||
|
||||
|
||||
def add_server_arguments(tile) -> None:
|
||||
|
@ -183,3 +196,76 @@ def progress_bar(
|
|||
f"{int(length - fill_length - 1) * ' '}▏{text}"
|
||||
)
|
||||
sys.stdout.write("\033[F")
|
||||
|
||||
|
||||
@dataclass
|
||||
class BoundaryBox:
|
||||
left: float
|
||||
bottom: float
|
||||
right: float
|
||||
top: float
|
||||
|
||||
@classmethod
|
||||
def from_text(cls, boundary_box: str):
|
||||
"""
|
||||
Parse boundary box string representation.
|
||||
|
||||
Note, that:
|
||||
left < right
|
||||
bottom < top
|
||||
|
||||
:param boundary_box: boundary box string representation in the form of
|
||||
<longitude 1>,<latitude 1>,<longitude 2>,<latitude 2> or simply
|
||||
<left>,<bottom>,<right>,<top>.
|
||||
"""
|
||||
matcher = re.match(
|
||||
"(?P<left>[0-9.-]*),(?P<bottom>[0-9.-]*),"
|
||||
+ "(?P<right>[0-9.-]*),(?P<top>[0-9.-]*)",
|
||||
boundary_box,
|
||||
)
|
||||
|
||||
if not matcher:
|
||||
logging.fatal("Invalid boundary box.")
|
||||
return None
|
||||
|
||||
try:
|
||||
left = float(matcher.group("left"))
|
||||
bottom = float(matcher.group("bottom"))
|
||||
right = float(matcher.group("right"))
|
||||
top = float(matcher.group("top"))
|
||||
except ValueError:
|
||||
logging.fatal("Invalid boundary box.")
|
||||
return None
|
||||
|
||||
if left >= right:
|
||||
logging.fatal("Negative horizontal boundary.")
|
||||
return None
|
||||
if bottom >= top:
|
||||
logging.error("Negative vertical boundary.")
|
||||
return None
|
||||
if right - left > 0.5 or top - bottom > 0.5:
|
||||
logging.error("Boundary box is too big.")
|
||||
return None
|
||||
|
||||
return cls(left, bottom, right, top)
|
||||
|
||||
def get_left_top(self) -> (np.array, np.array):
|
||||
"""Get left top corner of the boundary box."""
|
||||
return self.top, self.left
|
||||
|
||||
def get_right_bottom(self) -> (np.array, np.array):
|
||||
"""Get right bottom corner of the boundary box."""
|
||||
return self.bottom, self.right
|
||||
|
||||
def get_format(self) -> str:
|
||||
"""
|
||||
Get text representation of the boundary box:
|
||||
<longitude 1>,<latitude 1>,<longitude 2>,<latitude 2>. Coordinates are
|
||||
rounded to three digits after comma.
|
||||
"""
|
||||
a = (
|
||||
f"{self.left - 0.001:.3f},{self.bottom - 0.001:.3f},"
|
||||
f"{self.right + 0.001:.3f},{self.top + 0.001:.3f}"
|
||||
)
|
||||
print(self.left, a)
|
||||
return a
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue