map-machine/roentgen/constructor.py
2020-09-13 21:52:42 +03:00

392 lines
12 KiB
Python

"""
Construct Röntgen nodes and ways.
Author: Sergey Vartanov (me@enzet.ru).
"""
import numpy as np
from datetime import datetime
from hashlib import sha256
from typing import Any, Dict, List, Optional, Set
from roentgen import ui
from roentgen.extract_icon import DEFAULT_SMALL_SHAPE_ID
from roentgen.flinger import GeoFlinger
from roentgen.osm_reader import OSMMember, OSMRelation, OSMWay, OSMNode
from roentgen.scheme import IconSet, Scheme
from roentgen.util import MinMax
DEBUG: bool = False
class Node:
"""
Node in Röntgen terms.
"""
def __init__(
self, icon_set: IconSet, tags: Dict[str, str],
point: (float, float), path: Optional[str],
priority: int = 0, is_for_node: bool = True):
assert point is not None
self.icon_set: IconSet = icon_set
self.tags = tags
self.point = point
self.path = path
self.priority = priority
self.layer = 0
self.is_for_node = is_for_node
def get_tag(self, key: str):
if key in self.tags:
return self.tags[key]
return None
class Way:
"""
Way in Röntgen terms.
"""
def __init__(
self, kind: str, nodes: List[OSMNode], path, style: Dict[str, Any],
layer: float = 0.0, priority: float = 0, levels=None):
assert nodes or path
self.kind = kind
self.nodes: List[OSMNode] = nodes
self.path = path
self.style: Dict[str, Any] = style
self.layer = layer
self.priority = priority
self.levels = levels
def get_float(string):
"""
Try to parse float from a string.
"""
try:
return float(string)
except ValueError:
return 0
def line_center(nodes: List[OSMNode], flinger: GeoFlinger) -> np.array:
"""
Get geometric center of nodes set.
:param nodes: node list
:param flinger: flinger that remap geo positions
"""
x, y = MinMax(), MinMax()
for node in nodes: # type: OSMNode
flung = flinger.fling(node.position)
x.update(flung[0])
y.update(flung[1])
return np.array(((x.min_ + x.max_) / 2.0, (y.min_ + y.max_) / 2.0))
def get_user_color(text: str, seed: str):
"""
Generate random color based on text.
"""
if text == "":
return "#000000"
rgb = sha256((seed + text).encode("utf-8")).hexdigest()[-6:]
r = int(rgb[0:2], 16)
g = int(rgb[2:4], 16)
b = int(rgb[4:6], 16)
c = (r + g + b) / 3.
cc = 0
r = r * (1 - cc) + c * cc
g = g * (1 - cc) + c * cc
b = b * (1 - cc) + c * cc
h = hex(int(r))[2:] + hex(int(g))[2:] + hex(int(b))[2:]
return "#" + "0" * (6 - len(h)) + h
def get_time_color(time: Optional[datetime]):
"""
Generate color based on time.
"""
if time is None:
return "000000"
delta = (datetime.now() - time).total_seconds()
time_color = hex(0xFF - min(0xFF, int(delta / 500000.)))[2:]
i_time_color = hex(min(0xFF, int(delta / 500000.)))[2:]
if len(time_color) == 1:
time_color = "0" + time_color
if len(i_time_color) == 1:
i_time_color = "0" + i_time_color
return "#" + time_color + "AA" + i_time_color
def glue(ways: List[OSMWay]) -> List[List[OSMNode]]:
"""
Try to glue ways that share nodes.
:param ways: ways to glue
"""
result: List[List[OSMNode]] = []
to_process: Set[OSMWay] = set()
for way in ways: # type: OSMWay
if way.is_cycle():
result.append(way.nodes)
else:
to_process.add(way)
while to_process:
way: OSMWay = to_process.pop()
glued: Optional[OSMWay] = None
for other_way in to_process: # type: OSMWay
glued = way.try_to_glue(other_way)
if glued:
break
if glued:
to_process.remove(other_way)
if glued.is_cycle():
result.append(glued.nodes)
else:
to_process.add(glued)
else:
result.append(way.nodes)
return result
def get_path(nodes: List[OSMNode], shift: np.array, flinger: GeoFlinger) -> str:
"""
Construct SVG path from nodes.
"""
path = ""
prev_node = None
for node in nodes:
flung = flinger.fling(node.position) + shift
path += ("L" if prev_node else "M") + f" {flung[0]},{flung[1]} "
prev_node = node
if nodes[0] == nodes[-1]:
path += "Z"
else:
path = path[:-1]
return path
class Constructor:
"""
Röntgen node and way constructor.
"""
def __init__(self, check_level, mode, seed, map_, flinger, scheme: Scheme):
self.check_level = check_level
self.mode = mode
self.seed = seed
self.map_ = map_
self.flinger = flinger
self.scheme: Scheme = scheme
self.nodes: List[Node] = []
self.ways: List[Way] = []
def construct_ways(self):
"""
Construct Röntgen ways.
"""
for way_id in self.map_.way_map: # type: int
way: OSMWay = self.map_.way_map[way_id]
if not self.check_level(way.tags):
continue
self.construct_way(way, way.tags, None)
def construct_way(
self, way: Optional[OSMWay], tags: Dict[str, Any],
path: Optional[str]) -> None:
"""
Way construction.
:param way: OSM way
:param tags: way tag dictionary
:param path: way path (if there is no nodes)
"""
assert way or path
layer: float = 0
level: float = 0
if "layer" in tags:
layer = get_float(tags["layer"])
if "level" in tags:
try:
levels = list(map(float, tags["level"].split(";")))
level = sum(levels) / len(levels)
except ValueError:
pass
layer = 100 * level + 0.01 * layer
nodes = None
center_point = None
if way:
center_point = line_center(way.nodes, self.flinger)
nodes = way.nodes
if self.mode == "user-coloring":
if not way:
return
user_color = get_user_color(way.user, self.seed)
self.ways.append(
Way("way", nodes, path,
{"fill": "none", "stroke": user_color,
"stroke-width": 1}))
return
if self.mode == "time":
if not way:
return
time_color = get_time_color(way.timestamp)
self.ways.append(
Way("way", nodes, path,
{"fill": "none", "stroke": time_color,
"stroke-width": 1}))
return
if not tags:
return
appended = False
kind: str = "way"
levels = None
if "building" in tags:
kind = "building"
if "building:levels" in tags:
try:
levels = float(tags["building:levels"])
except ValueError:
levels = None
for element in self.scheme.ways: # type: Dict[str, Any]
matched: bool = True
for config_tag_key in element["tags"]: # type: str
matcher = element["tags"][config_tag_key]
if config_tag_key not in tags or \
(matcher != "*" and
tags[config_tag_key] != matcher and
tags[config_tag_key] not in matcher):
matched = False
break
if "no_tags" in element:
for config_tag_key in element["no_tags"]: # type: str
if config_tag_key in tags and \
tags[config_tag_key] == \
element["no_tags"][config_tag_key]:
matched = False
break
if matched:
style: Dict[str, Any] = {"fill": "none"}
if "layer" in element:
layer += element["layer"]
for key in element: # type: str
if key not in ["tags", "no_tags", "layer", "level", "icon"]:
value = element[key]
if isinstance(value, str) and value.endswith("_color"):
value = self.scheme.get_color(value)
style[key] = value
self.ways.append(
Way(kind, nodes, path, style, layer, 50, levels))
if center_point is not None and \
(way.is_cycle() or "area" in tags and tags["area"]):
icon_set: IconSet = self.scheme.get_icon(tags)
self.nodes.append(Node(
icon_set, tags, center_point, path, is_for_node=False))
appended = True
if not appended:
if DEBUG:
style: Dict[str, Any] = {
"fill": "none", "stroke": "#FF0000", "stroke-width": 1}
self.ways.append(Way(
kind, nodes, path, style, layer, 50, levels))
if center_point is not None and way.is_cycle() or \
"area" in tags and tags["area"]:
icon_set: IconSet = self.scheme.get_icon(tags)
self.nodes.append(Node(
icon_set, tags, center_point, path, is_for_node=False))
def construct_relations(self) -> None:
"""
Construct Röntgen ways from OSM relations.
"""
for relation_id in self.map_.relation_map:
relation: OSMRelation = self.map_.relation_map[relation_id]
tags = relation.tags
if not self.check_level(tags):
continue
if "type" in tags and tags["type"] == "multipolygon":
inners, outers = [], []
for member in relation.members: # type: OSMMember
if member.type_ == "way":
if member.role == "inner":
if member.ref in self.map_.way_map:
inners.append(self.map_.way_map[member.ref])
elif member.role == "outer":
if member.ref in self.map_.way_map:
outers.append(self.map_.way_map[member.ref])
p = ""
inners_path = glue(inners)
outers_path = glue(outers)
for nodes in outers_path:
path = get_path(nodes, np.array([0, 0]), self.flinger)
p += path + " "
for nodes in inners_path:
nodes.reverse()
path = get_path(nodes, np.array([0, 0]), self.flinger)
p += path + " "
if p:
self.construct_way(None, tags, p)
def construct_nodes(self) -> None:
"""
Draw nodes.
"""
print("Draw nodes...")
start_time = datetime.now()
node_number: int = 0
s = sorted(
self.map_.node_map.keys(),
key=lambda x: -self.map_.node_map[x].position.lat)
for node_id in s: # type: int
node_number += 1
ui.progress_bar(node_number, len(self.map_.node_map))
node: OSMNode = self.map_.node_map[node_id]
flung = self.flinger.fling(node.position)
tags = node.tags
if not self.check_level(tags):
continue
icon_set: IconSet = self.scheme.get_icon(tags)
if self.mode in ["time", "user-coloring"]:
if not tags:
continue
icon_set.icons = [[DEFAULT_SMALL_SHAPE_ID]]
break
if self.mode == "user-coloring":
icon_set.color = get_user_color(node.user, self.seed)
if self.mode == "time":
icon_set.color = get_time_color(node.timestamp)
self.nodes.append(Node(icon_set, tags, flung, None))
ui.progress_bar(-1, len(self.map_.node_map))
print("Nodes painted in " + str(datetime.now() - start_time) + ".")