Fix parsing; fix code style.

This commit is contained in:
Sergey Vartanov 2020-09-10 00:39:45 +03:00
parent 9b10f29d39
commit 7bfbf32697
9 changed files with 204 additions and 151 deletions

Binary file not shown.

Before

Width:  |  Height:  |  Size: 416 KiB

After

Width:  |  Height:  |  Size: 380 KiB

Before After
Before After

Binary file not shown.

Before

Width:  |  Height:  |  Size: 171 KiB

After

Width:  |  Height:  |  Size: 246 KiB

Before After
Before After

Binary file not shown.

Before

Width:  |  Height:  |  Size: 410 KiB

After

Width:  |  Height:  |  Size: 376 KiB

Before After
Before After

View file

@ -1,3 +1,8 @@
"""
Construct Röntgen nodes and ways.
Author: Sergey Vartanov (me@enzet.ru).
"""
import numpy as np
from datetime import datetime
@ -6,9 +11,10 @@ from typing import Any, Dict, List, Optional, Set
from roentgen import ui
from roentgen.extract_icon import DEFAULT_SMALL_SHAPE_ID
from roentgen.flinger import Geo, GeoFlinger
from roentgen.osm_reader import OSMMember, OSMRelation, OSMWay
from roentgen.flinger import GeoFlinger
from roentgen.osm_reader import OSMMember, OSMRelation, OSMWay, OSMNode
from roentgen.scheme import IconSet, Scheme
from roentgen.util import MinMax
DEBUG: bool = False
@ -40,10 +46,12 @@ class Way:
Way in Röntgen terms.
"""
def __init__(
self, kind: str, nodes, path, style: Dict[str, Any],
self, kind: str, nodes: List[OSMNode], path, style: Dict[str, Any],
layer: float = 0.0, priority: float = 0, levels=None):
assert nodes or path
self.kind = kind
self.nodes = nodes
self.nodes: List[OSMNode] = nodes
self.path = path
self.style: Dict[str, Any] = style
self.layer = layer
@ -61,23 +69,20 @@ def get_float(string):
return 0
def line_center(nodes, flinger: GeoFlinger):
def line_center(nodes: List[OSMNode], flinger: GeoFlinger) -> np.array:
"""
Get geometric center of nodes set.
:param nodes: node list
:param flinger: flinger that remap geo positions
"""
ma = [0, 0]
mi = [10000, 10000]
for node in nodes:
flung = flinger.fling(Geo(node.lat, node.lon))
if flung[0] > ma[0]:
ma[0] = flung[0]
if flung[1] > ma[1]:
ma[1] = flung[1]
if flung[0] < mi[0]:
mi[0] = flung[0]
if flung[1] < mi[1]:
mi[1] = flung[1]
return [(ma[0] + mi[0]) / 2.0, (ma[1] + mi[1]) / 2.0]
x, y = MinMax(), MinMax()
for node in nodes: # type: OSMNode
flung = flinger.fling(node.position)
x.add(flung[0])
y.add(flung[1])
return np.array(((x.min_ + x.max_) / 2.0, (y.min_ + y.max_) / 2.0))
def get_user_color(text: str, seed: str):
@ -99,12 +104,12 @@ def get_user_color(text: str, seed: str):
return "#" + "0" * (6 - len(h)) + h
def get_time_color(time: datetime):
def get_time_color(time: Optional[datetime]):
"""
Generate color based on time.
"""
if not time:
return "#000000"
if time is None:
return "000000"
delta = (datetime.now() - time).total_seconds()
time_color = hex(0xFF - min(0xFF, int(delta / 500000.)))[2:]
i_time_color = hex(min(0xFF, int(delta / 500000.)))[2:]
@ -115,11 +120,13 @@ def get_time_color(time: datetime):
return "#" + time_color + "AA" + i_time_color
def glue(ways: List[OSMWay]):
def glue(ways: List[OSMWay]) -> List[List[OSMNode]]:
"""
Try to glue ways that share nodes.
:param ways: ways to glue
"""
result: List[List[int]] = []
result: List[List[OSMNode]] = []
to_process: Set[OSMWay] = set()
for way in ways: # type: OSMWay
@ -149,17 +156,16 @@ def glue(ways: List[OSMWay]):
return result
def get_path(nodes, shift, map_, flinger: GeoFlinger):
def get_path(nodes: List[OSMNode], shift: np.array, flinger: GeoFlinger) -> str:
"""
Construct SVG path from nodes.
"""
path = ""
prev_node = None
for node_id in nodes:
node = map_.node_map[node_id]
flung = np.add(flinger.fling(Geo(node.lat, node.lon)), shift)
for node in nodes:
flung = flinger.fling(node.position) + shift
path += ("L" if prev_node else "M") + f" {flung[0]},{flung[1]} "
prev_node = map_.node_map[node_id]
prev_node = node
if nodes[0] == nodes[-1]:
path += "Z"
else:
@ -198,10 +204,12 @@ class Constructor:
"""
Way construction.
:param way: OSM way.
:param tags: way tag dictionary.
:param path: way path (if there is no nodes).
:param way: OSM way
:param tags: way tag dictionary
:param path: way path (if there is no nodes)
"""
assert way or path
layer: float = 0
level: float = 0
@ -209,7 +217,7 @@ class Constructor:
layer = get_float(tags["layer"])
if "level" in tags:
try:
levels = list(map(lambda x: float(x), tags["level"].split(";")))
levels = list(map(float, tags["level"].split(";")))
level = sum(levels) / len(levels)
except ValueError:
pass
@ -221,8 +229,7 @@ class Constructor:
center_point = None
if way:
center_point = line_center(
map(lambda x: self.map_.node_map[x], way.nodes), self.flinger)
center_point = line_center(way.nodes, self.flinger)
nodes = way.nodes
if self.mode == "user-coloring":
@ -255,7 +262,10 @@ class Constructor:
if "building" in tags:
kind = "building"
if "building:levels" in tags:
levels = float(tags["building:levels"])
try:
levels = float(tags["building:levels"])
except ValueError:
levels = None
for element in self.scheme.ways: # type: Dict[str, Any]
matched: bool = True
@ -286,7 +296,7 @@ class Constructor:
style[key] = value
self.ways.append(
Way(kind, nodes, path, style, layer, 50, levels))
if center_point and way.is_cycle() or \
if center_point is not None and way.is_cycle() or \
"area" in tags and tags["area"]:
icon_set: IconSet = self.scheme.get_icon(tags)
self.nodes.append(Node(
@ -297,7 +307,7 @@ class Constructor:
style: Dict[str, Any] = {
"fill": "none", "stroke": "#FF0000", "stroke-width": 1}
self.ways.append(Way(kind, nodes, path, style, layer, 50, levels))
if center_point and way.is_cycle() or \
if center_point is not None and way.is_cycle() or \
"area" in tags and tags["area"]:
icon_set: IconSet = self.scheme.get_icon(tags)
self.nodes.append(Node(
@ -326,13 +336,14 @@ class Constructor:
inners_path = glue(inners)
outers_path = glue(outers)
for nodes in outers_path:
path = get_path(nodes, [0, 0], self.map_, self.flinger)
path = get_path(nodes, np.array([0, 0]), self.flinger)
p += path + " "
for nodes in inners_path:
nodes.reverse()
path = get_path(nodes, [0, 0], self.map_, self.flinger)
path = get_path(nodes, np.array([0, 0]), self.flinger)
p += path + " "
self.construct_way(None, tags, p)
if p:
self.construct_way(None, tags, p)
def construct_nodes(self) -> None:
"""
@ -345,13 +356,14 @@ class Constructor:
node_number: int = 0
s = sorted(
self.map_.node_map.keys(), key=lambda x: -self.map_.node_map[x].lat)
self.map_.node_map.keys(),
key=lambda x: -self.map_.node_map[x].position.lat)
for node_id in s: # type: int
node_number += 1
ui.progress_bar(node_number, len(self.map_.node_map))
node = self.map_.node_map[node_id]
flung = self.flinger.fling(Geo(node.lat, node.lon))
node: OSMNode = self.map_.node_map[node_id]
flung = self.flinger.fling(node.position)
tags = node.tags
if not self.check_level(tags):
@ -363,6 +375,7 @@ class Constructor:
if not tags:
continue
icon_set.icons = [[DEFAULT_SMALL_SHAPE_ID]]
break
if self.mode == "user-coloring":
icon_set.color = get_user_color(node.user, self.seed)
if self.mode == "time":

View file

@ -28,6 +28,8 @@ class Icon:
:param offset: vector that should be used to shift the path
:param id_: shape identifier
"""
assert path
self.path: str = path
self.offset: np.array = offset
self.id_: str = id_

View file

@ -95,7 +95,7 @@ class GeoFlinger:
self.space = space
def fling(self, current):
def fling(self, current) -> np.array:
"""
:param current: vector to fling
"""
@ -106,4 +106,4 @@ class GeoFlinger:
self.maximum.lat + self.minimum.lat - current.lat,
self.minimum.lat, self.maximum.lat,
self.target_minimum[1], self.target_maximum[1])
return [x, y]
return np.array([x, y])

View file

@ -12,7 +12,7 @@ from svgwrite.container import Group
from svgwrite.path import Path
from svgwrite.shapes import Circle, Rect
from svgwrite.text import Text
from typing import Dict, List
from typing import Any, Dict, List
from roentgen import ui
from roentgen.address import get_address
@ -243,10 +243,8 @@ class Painter:
if way.nodes:
for i in range(len(way.nodes) - 1):
node_1 = self.map_.node_map[way.nodes[i]]
node_2 = self.map_.node_map[way.nodes[i + 1]]
flung_1 = self.flinger.fling(Geo(node_1.lat, node_1.lon))
flung_2 = self.flinger.fling(Geo(node_2.lat, node_2.lon))
flung_1 = self.flinger.fling(way.nodes[i].position)
flung_2 = self.flinger.fling(way.nodes[i + 1].position)
self.svg.add(self.svg.path(
d=("M", np.add(flung_1, shift_1), "L",
@ -265,7 +263,7 @@ class Painter:
for way in ways:
if way.kind == "way":
if way.nodes:
path = get_path(way.nodes, [0, 0], self.map_, self.flinger)
path = get_path(way.nodes, np.array([0, 0]), self.flinger)
p = Path(d=path)
p.update(way.style)
self.svg.add(p)
@ -285,10 +283,8 @@ class Painter:
if way.levels:
shift = [-5 * way.levels, 5 * way.levels]
for i in range(len(way.nodes) - 1):
node_1 = self.map_.node_map[way.nodes[i]]
node_2 = self.map_.node_map[way.nodes[i + 1]]
flung_1 = self.flinger.fling(Geo(node_1.lat, node_1.lon))
flung_2 = self.flinger.fling(Geo(node_2.lat, node_2.lon))
flung_1 = self.flinger.fling(way.nodes[i].position)
flung_2 = self.flinger.fling(way.nodes[i + 1].position)
building_shade.add(Path(
("M", flung_1, "L", flung_2, np.add(flung_2, shift),
np.add(flung_1, shift), "Z"),
@ -310,8 +306,8 @@ class Painter:
if way.nodes:
shift = [0, -3]
if way.levels:
shift = [0 * way.levels, min(-3, -1 * way.levels)]
path = get_path(way.nodes, shift, self.map_, self.flinger)
shift = np.array([0 * way.levels, min(-3, -1 * way.levels)])
path = get_path(way.nodes, shift, self.flinger)
p = Path(d=path, opacity=1)
p.update(way.style)
self.svg.add(p)
@ -400,8 +396,7 @@ class Painter:
path.set_desc(title=title)
self.svg.add(path)
def draw_point_outline(
self, icon: Icon, point, fill, mode="default"):
def draw_point_outline(self, icon: Icon, point, fill, mode="default"):
point = np.array(list(map(lambda x: int(x), point)))
@ -425,10 +420,12 @@ class Painter:
self.svg.add(path)
def check_level_number(tags, level):
def check_level_number(tags: Dict[str, Any], level: float):
"""
Check if element described by tags is no the specified level.
"""
if "level" in tags:
levels = \
map(lambda x: float(x), tags["level"].replace(",", ".").split(";"))
levels = map(float, tags["level"].replace(",", ".").split(";"))
if level not in levels:
return False
else:
@ -436,19 +433,26 @@ def check_level_number(tags, level):
return True
def check_level_overground(tags):
def check_level_overground(tags: Dict[str, Any]):
"""
Check if element described by tags is overground.
"""
if "level" in tags:
levels = \
map(lambda x: float(x), tags["level"].replace(",", ".").split(";"))
for level in levels:
if level <= 0:
return False
try:
levels = map(float, tags["level"].replace(",", ".").split(";"))
for level in levels:
if level <= 0:
return False
except ValueError:
pass
if "layer" in tags:
levels = \
map(lambda x: float(x), tags["layer"].replace(",", ".").split(";"))
for level in levels:
if level <= 0:
return False
try:
levels = map(float, tags["layer"].replace(",", ".").split(";"))
for level in levels:
if level <= 0:
return False
except ValueError:
pass
if "parking" in tags and tags["parking"] == "underground":
return False
return True

View file

@ -1,12 +1,16 @@
"""
Reading OpenStreetMap data from XML file.
Author: Sergey Vartanov
Author: Sergey Vartanov (me@enzet.ru).
"""
from datetime import datetime
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Set, Union
from roentgen import ui
from roentgen.flinger import Geo
from roentgen.ui import progress_bar
from roentgen.util import MinMax
OSM_TIME_PATTERN: str = "%Y-%m-%dT%H:%M:%SZ"
class OSMNode:
@ -15,15 +19,14 @@ class OSMNode:
See https://wiki.openstreetmap.org/wiki/Node
"""
def __init__(self, id_: int = 0, lat: float = 0, lon: float = 0):
self.id_: int = id_
self.lat: float = lat
self.lon: float = lon
def __init__(self):
self.id_: Optional[int] = None
self.position: Optional[Geo] = None
self.tags: Dict[str, str] = {}
self.visible: Optional[str] = None
self.changeset: Optional[str] = None
self.timestamp: Optional[str] = None
self.timestamp: Optional[datetime] = None
self.user: Optional[str] = None
self.uid: Optional[str] = None
@ -35,13 +38,14 @@ class OSMNode:
:param is_full: if false, parse only ID, latitude and longitude
"""
self.id_ = int(get_value("id", text))
self.lat = float(get_value("lat", text))
self.lon = float(get_value("lon", text))
self.position = Geo(
float(get_value("lat", text)), float(get_value("lon", text)))
if is_full:
self.visible = get_value("visible", text)
self.changeset = get_value("changeset", text)
self.timestamp = get_value("timestamp", text)
self.timestamp = datetime.strptime(
get_value("timestamp", text), OSM_TIME_PATTERN)
self.user = get_value("user", text)
self.uid = get_value("uid", text)
@ -54,9 +58,9 @@ class OSMWay:
See https://wiki.openstreetmap.org/wiki/Way
"""
def __init__(self, id_: int = 0, nodes=None):
def __init__(self, id_: int = 0, nodes: Optional[List[OSMNode]] = None):
self.id_: int = id_
self.nodes: List[int] = [] if nodes is None else nodes
self.nodes: List[OSMNode] = [] if nodes is None else nodes
self.tags: Dict[str, str] = {}
self.visible: Optional[str] = None
@ -78,7 +82,7 @@ class OSMWay:
self.visible = get_value("visible", text)
self.changeset = get_value("changeset", text)
self.timestamp = datetime.strptime(
get_value("timestamp", text), "%Y-%m-%dT%H:%M:%SZ")
get_value("timestamp", text), OSM_TIME_PATTERN)
self.user = get_value("user", text)
self.uid = get_value("uid", text)
@ -134,9 +138,9 @@ class OSMMember:
Member of OpenStreetMap relation.
"""
def __init__(self, text: str):
self.type_ = get_value("type", text)
self.ref = int(get_value("ref", text))
self.role = get_value("role", text)
self.type_: str = get_value("type", text)
self.ref: int = int(get_value("ref", text))
self.role: str = get_value("role", text)
def get_value(key: str, text: str):
@ -144,8 +148,9 @@ def get_value(key: str, text: str):
Parse xml value from the tag in the format of key="value".
"""
if key + '="' in text:
index: int = text.find(key + '="')
value = text[index + len(key) + 2:text.find('"', index + len(key) + 4)]
start_index: int = text.find(key + '="') + 2
end_index: int = start_index + len(key)
value = text[end_index:text.find('"', end_index + 2)]
return value
@ -158,6 +163,33 @@ class Map:
self.way_map: Dict[int, OSMWay] = {}
self.relation_map: Dict[int, OSMRelation] = {}
self.authors: Set[str] = set()
self.time: MinMax = MinMax()
def add_node(self, node: OSMNode):
"""
Add node and update map parameters.
"""
self.node_map[node.id_] = node
if node.user:
self.authors.add(node.user)
self.time.add(node.timestamp)
def add_way(self, way: OSMWay):
"""
Add way and update map parameters.
"""
self.way_map[way.id_] = way
if way.user:
self.authors.add(way.user)
self.time.add(way.timestamp)
def add_relation(self, relation: OSMRelation):
"""
Add relation and update map parameters.
"""
self.relation_map[relation.id_] = relation
class OSMReader:
"""
@ -176,75 +208,75 @@ class OSMReader:
lines_number: int = sum(1 for _ in open(file_name))
print(f"Parsing OSM file {file_name}...")
input_file = open(file_name)
line = input_file.readline()
line_number = 0
line_number: int = 0
element = None
element: Optional[Union[OSMNode, OSMWay, OSMRelation]] = None
while line != "":
line_number += 1
ui.progress_bar(line_number, lines_number)
with open(file_name) as input_file:
for line in input_file.readlines(): # type: str
# Node parsing.
line = line.strip()
if line[:6] in [" <node", "\t<node"] or line[:7] == " <node":
if not parse_nodes:
if parse_ways or parse_relations:
continue
break
if line[-3] == "/":
node: OSMNode = OSMNode().parse_from_xml(line[7:-3], full)
self.map_.node_map[node.id_] = node
else:
element = OSMNode().parse_from_xml(line[7:-2], full)
elif line in [" </node>\n", "\t</node>\n", " </node>\n"]:
self.map_.node_map[element.id_] = element
line_number += 1
progress_bar(line_number, lines_number)
# Way parsing.
# Node parsing.
elif line[:5] in [' <way', '\t<way'] or line[:6] == " <way":
if not parse_ways:
if parse_relations:
continue
break
if line[-3] == '/':
way = OSMWay().parse_from_xml(line[6:-3], full)
self.map_.way_map[way.id_] = way
else:
element = OSMWay().parse_from_xml(line[6:-2], full)
elif line in [' </way>\n', '\t</way>\n'] or line == " </way>\n":
self.map_.way_map[element.id_] = element
if line.startswith("<node"):
if not parse_nodes:
if parse_ways or parse_relations:
continue
else:
break
if line[-2] == "/":
node: OSMNode = OSMNode().parse_from_xml(line, full)
self.map_.add_node(node)
else:
element = OSMNode().parse_from_xml(line, full)
elif line == "</node>":
self.map_.add_node(element)
# Relation parsing.
# Way parsing.
elif line[:10] in [" <relation", "\t<relation"] or \
line[:11] == " <relation":
if not parse_relations:
break
if line[-3] == "/":
relation = OSMRelation().parse_from_xml(line[11:-3])
self.map_.relation_map[relation.id_] = relation
else:
element = OSMRelation().parse_from_xml(line[11:-2])
elif line in [" </relation>\n", "\t</relation>\n"] or \
line == " </relation>\n":
self.map_.relation_map[element.id_] = element
elif line.startswith("<way"):
if not parse_ways:
if parse_relations:
continue
else:
break
if line[-2] == "/":
way = OSMWay().parse_from_xml(line, full)
self.map_.add_way(way)
else:
element = OSMWay().parse_from_xml(line, full)
elif line == "</way>":
self.map_.add_way(element)
# Elements parsing.
# Relation parsing.
elif line[:6] in [" <tag", "\t\t<tag"] or line[:8] == " <tag":
k = get_value("k", line[7:-3])
v = get_value("v", line[7:-3])
element.tags[k] = v
elif line[:5] in [" <nd", "\t\t<nd"] or line[:7] == " <nd":
element.nodes.append(int(get_value("ref", line)))
elif line[:9] in [" <member", "\t\t<member"] or \
line[:11] == " <member":
element.members.append(OSMMember(line[10:-3]))
line = input_file.readline()
input_file.close()
elif line.startswith("<relation"):
if not parse_relations:
break
if line[-2] == "/":
relation = OSMRelation().parse_from_xml(line)
self.map_.add_relation(relation)
else:
element = OSMRelation().parse_from_xml(line)
elif line == "</relation>":
self.map_.add_relation(element)
ui.progress_bar(-1, lines_number) # Complete progress bar.
# Elements parsing.
elif line.startswith("<tag"):
k = get_value("k", line)
v = get_value("v", line)
element.tags[k] = v
elif line.startswith("<nd"):
element.nodes.append(
self.map_.node_map[int(get_value("ref", line))])
elif line.startswith("<member"):
element.members.append(OSMMember(line))
progress_bar(-1, lines_number) # Complete progress bar.
return self.map_

View file

@ -72,6 +72,8 @@ class Scheme:
"""
if color in self.colors:
return "#" + self.colors[color]
if color.lower() in self.colors:
return "#" + self.colors[color.lower()]
if color.startswith("#"):
return color