mirror of
https://github.com/enzet/map-machine.git
synced 2025-05-02 11:46:41 +02:00
467 lines
14 KiB
Python
467 lines
14 KiB
Python
"""
|
|
Parse OSM XML file.
|
|
"""
|
|
import json
|
|
import logging
|
|
import re
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
from xml.etree import ElementTree
|
|
from xml.etree.ElementTree import Element
|
|
|
|
import numpy as np
|
|
|
|
from map_machine.geometry.boundary_box import BoundaryBox
|
|
from map_machine.util import MinMax
|
|
|
|
__author__ = "Sergey Vartanov"
|
|
__email__ = "me@enzet.ru"
|
|
|
|
OSM_TIME_PATTERN: str = "%Y-%m-%dT%H:%M:%SZ"
|
|
|
|
METERS_PATTERN: re.Pattern = re.compile("^(?P<value>\\d*\\.?\\d*)\\s*m$")
|
|
KILOMETERS_PATTERN: re.Pattern = re.compile("^(?P<value>\\d*\\.?\\d*)\\s*km$")
|
|
MILES_PATTERN: re.Pattern = re.compile("^(?P<value>\\d*\\.?\\d*)\\s*mi$")
|
|
|
|
EARTH_EQUATOR_LENGTH: float = 40_075_017.0
|
|
|
|
Tags = dict[str, str]
|
|
|
|
# See https://wiki.openstreetmap.org/wiki/Lifecycle_prefix#Stages_of_decay
|
|
STAGES_OF_DECAY: list[str] = [
|
|
"disused",
|
|
"abandoned",
|
|
"ruins",
|
|
"demolished",
|
|
"removed",
|
|
"razed",
|
|
"destroyed",
|
|
"was", # is not actually a stage of decay
|
|
]
|
|
|
|
|
|
def parse_float(string: str) -> Optional[float]:
|
|
"""Parse string representation of a float or integer value."""
|
|
try:
|
|
return float(string)
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
|
|
def parse_levels(string: str) -> list[float]:
|
|
"""Parse string representation of level sequence value."""
|
|
# TODO: add `-` parsing
|
|
try:
|
|
return list(map(float, string.replace(",", ".").split(";")))
|
|
except ValueError:
|
|
logging.warning(f"Cannot parse level description from `{string}`.")
|
|
return []
|
|
|
|
|
|
@dataclass
|
|
class Tagged:
|
|
"""Something with tags (string to string mapping)."""
|
|
|
|
tags: Tags
|
|
|
|
def get_tag(self, key: str) -> Optional[str]:
|
|
"""
|
|
Get tag value or None if it doesn't exist.
|
|
|
|
:param key: tag key
|
|
:return: tag value or None
|
|
"""
|
|
if key in self.tags:
|
|
return self.tags[key]
|
|
return None
|
|
|
|
def get_float(self, key: str) -> Optional[float]:
|
|
"""Parse float from tag value."""
|
|
if key in self.tags:
|
|
return parse_float(self.tags[key])
|
|
return None
|
|
|
|
def get_length(self, key: str) -> Optional[float]:
|
|
"""Get length in meters."""
|
|
if key not in self.tags:
|
|
return None
|
|
|
|
value: str = self.tags[key]
|
|
|
|
float_value: float = parse_float(value)
|
|
if float_value is not None:
|
|
return float_value
|
|
|
|
for pattern, ratio in [
|
|
(METERS_PATTERN, 1.0),
|
|
(KILOMETERS_PATTERN, 1000.0),
|
|
(MILES_PATTERN, 1609.344),
|
|
]:
|
|
matcher: re.Match = pattern.match(value)
|
|
if matcher:
|
|
float_value: float = parse_float(matcher.group("value"))
|
|
if float_value is not None:
|
|
return float_value * ratio
|
|
|
|
return None
|
|
|
|
def verify(self) -> bool:
|
|
"""Check key and value types."""
|
|
is_well_formed: bool = True
|
|
|
|
for value, key in self.tags.items():
|
|
if not isinstance(key, str):
|
|
logging.warning(f"Not string key {key}.")
|
|
is_well_formed = False
|
|
if not isinstance(value, str):
|
|
logging.warning(f"Not string value {value}.")
|
|
is_well_formed = False
|
|
|
|
return is_well_formed
|
|
|
|
|
|
@dataclass
|
|
class OSMNode(Tagged):
|
|
"""
|
|
OpenStreetMap node.
|
|
|
|
See https://wiki.openstreetmap.org/wiki/Node
|
|
"""
|
|
|
|
id_: int
|
|
coordinates: np.ndarray
|
|
visible: Optional[str] = None
|
|
changeset: Optional[str] = None
|
|
timestamp: Optional[datetime] = None
|
|
user: Optional[str] = None
|
|
uid: Optional[str] = None
|
|
|
|
@classmethod
|
|
def from_xml_structure(cls, element: Element) -> "OSMNode":
|
|
"""Parse node from OSM XML `<node>` element."""
|
|
attributes = element.attrib
|
|
tags: Tags = {
|
|
x.attrib["k"]: x.attrib["v"] for x in element if x.tag == "tag"
|
|
}
|
|
return cls(
|
|
tags,
|
|
int(attributes["id"]),
|
|
np.array((float(attributes["lat"]), float(attributes["lon"]))),
|
|
attributes["visible"] if "visible" in attributes else None,
|
|
attributes["changeset"] if "changeset" in attributes else None,
|
|
datetime.strptime(attributes["timestamp"], OSM_TIME_PATTERN)
|
|
if "timestamp" in attributes
|
|
else None,
|
|
attributes["user"] if "user" in attributes else None,
|
|
attributes["uid"] if "uid" in attributes else None,
|
|
)
|
|
|
|
@classmethod
|
|
def parse_from_structure(cls, structure: dict[str, Any]) -> "OSMNode":
|
|
"""
|
|
Parse node from Overpass-like structure.
|
|
|
|
:param structure: input structure
|
|
"""
|
|
return cls(
|
|
structure["tags"] if "tags" in structure else {},
|
|
structure["id"],
|
|
coordinates=np.array((structure["lat"], structure["lon"])),
|
|
)
|
|
|
|
def __hash__(self) -> int:
|
|
return self.id_
|
|
|
|
|
|
@dataclass
|
|
class OSMWay(Tagged):
|
|
"""
|
|
OpenStreetMap way.
|
|
|
|
See https://wiki.openstreetmap.org/wiki/Way
|
|
"""
|
|
|
|
id_: int
|
|
nodes: Optional[list[OSMNode]] = field(default_factory=list)
|
|
visible: Optional[str] = None
|
|
changeset: Optional[str] = None
|
|
timestamp: Optional[datetime] = None
|
|
user: Optional[str] = None
|
|
uid: Optional[str] = None
|
|
|
|
@classmethod
|
|
def from_xml_structure(
|
|
cls, element: Element, nodes: dict[int, OSMNode]
|
|
) -> "OSMWay":
|
|
"""Parse way from OSM XML `<way>` element."""
|
|
attributes = element.attrib
|
|
tags: Tags = {
|
|
x.attrib["k"]: x.attrib["v"] for x in element if x.tag == "tag"
|
|
}
|
|
return cls(
|
|
tags,
|
|
int(element.attrib["id"]),
|
|
[nodes[int(x.attrib["ref"])] for x in element if x.tag == "nd"],
|
|
attributes["visible"] if "visible" in attributes else None,
|
|
attributes["changeset"] if "changeset" in attributes else None,
|
|
datetime.strptime(attributes["timestamp"], OSM_TIME_PATTERN)
|
|
if "timestamp" in attributes
|
|
else None,
|
|
attributes["user"] if "user" in attributes else None,
|
|
attributes["uid"] if "uid" in attributes else None,
|
|
)
|
|
|
|
@classmethod
|
|
def parse_from_structure(
|
|
cls, structure: dict[str, Any], nodes: dict[int, OSMNode]
|
|
) -> "OSMWay":
|
|
"""
|
|
Parse way from Overpass-like structure.
|
|
|
|
:param structure: input structure
|
|
:param nodes: node structure
|
|
"""
|
|
return cls(
|
|
structure["tags"] if "tags" in structure else {},
|
|
structure["id"],
|
|
[nodes[x] for x in structure["nodes"]],
|
|
)
|
|
|
|
def is_cycle(self) -> bool:
|
|
"""Is way a cycle way or an area boundary."""
|
|
return self.nodes[0] == self.nodes[-1]
|
|
|
|
def __repr__(self) -> str:
|
|
return f"Way <{self.id_}> {self.nodes}"
|
|
|
|
|
|
@dataclass
|
|
class OSMMember:
|
|
"""Member of OpenStreetMap relation."""
|
|
|
|
type_: str
|
|
ref: int
|
|
role: str
|
|
|
|
|
|
@dataclass
|
|
class OSMRelation(Tagged):
|
|
"""
|
|
OpenStreetMap relation.
|
|
|
|
See https://wiki.openstreetmap.org/wiki/Relation
|
|
"""
|
|
|
|
id_: int
|
|
members: Optional[list[OSMMember]]
|
|
visible: Optional[str] = None
|
|
changeset: Optional[str] = None
|
|
timestamp: Optional[datetime] = None
|
|
user: Optional[str] = None
|
|
uid: Optional[str] = None
|
|
|
|
@classmethod
|
|
def from_xml_structure(cls, element: Element) -> "OSMRelation":
|
|
"""Parse relation from OSM XML `<relation>` element."""
|
|
attributes = element.attrib
|
|
members: list[OSMMember] = []
|
|
tags: Tags = {}
|
|
for subelement in element:
|
|
if subelement.tag == "member":
|
|
subattributes = subelement.attrib
|
|
members.append(
|
|
OSMMember(
|
|
subattributes["type"],
|
|
int(subattributes["ref"]),
|
|
subattributes["role"],
|
|
)
|
|
)
|
|
if subelement.tag == "tag":
|
|
tags[subelement.attrib["k"]] = subelement.attrib["v"]
|
|
return cls(
|
|
tags,
|
|
int(attributes["id"]),
|
|
members,
|
|
attributes["visible"] if "visible" in attributes else None,
|
|
attributes["changeset"] if "changeset" in attributes else None,
|
|
datetime.strptime(attributes["timestamp"], OSM_TIME_PATTERN)
|
|
if "timestamp" in attributes
|
|
else None,
|
|
attributes["user"] if "user" in attributes else None,
|
|
attributes["uid"] if "uid" in attributes else None,
|
|
)
|
|
|
|
@classmethod
|
|
def parse_from_structure(cls, structure: dict[str, Any]) -> "OSMRelation":
|
|
"""
|
|
Parse relation from Overpass-like structure.
|
|
|
|
:param structure: input structure
|
|
"""
|
|
return cls(
|
|
structure["tags"],
|
|
structure["id"],
|
|
[
|
|
OSMMember(x["type"], x["ref"], x["role"])
|
|
for x in structure["members"]
|
|
],
|
|
)
|
|
|
|
|
|
class NotWellFormedOSMDataException(Exception):
|
|
"""OSM data structure is not well-formed."""
|
|
|
|
|
|
class OSMData:
|
|
"""The whole OpenStreetMap information about nodes, ways, and relations."""
|
|
|
|
def __init__(self) -> None:
|
|
self.nodes: dict[int, OSMNode] = {}
|
|
self.ways: dict[int, OSMWay] = {}
|
|
self.relations: dict[int, OSMRelation] = {}
|
|
|
|
self.authors: set[str] = set()
|
|
self.levels: set[float] = set()
|
|
self.time: MinMax = MinMax()
|
|
self.view_box: Optional[BoundaryBox] = None
|
|
self.equator_length: float = EARTH_EQUATOR_LENGTH
|
|
|
|
def add_node(self, node: OSMNode) -> None:
|
|
"""Add node and update map parameters."""
|
|
if node.id_ in self.nodes:
|
|
raise NotWellFormedOSMDataException(
|
|
f"Node with duplicate id {node.id_}."
|
|
)
|
|
self.nodes[node.id_] = node
|
|
if node.user:
|
|
self.authors.add(node.user)
|
|
if node.tags.get("level"):
|
|
self.levels.union(parse_levels(node.tags["level"]))
|
|
self.time.update(node.timestamp)
|
|
|
|
def add_way(self, way: OSMWay) -> None:
|
|
"""Add way and update map parameters."""
|
|
if way.id_ in self.ways:
|
|
raise NotWellFormedOSMDataException(
|
|
f"Way with duplicate id {way.id_}."
|
|
)
|
|
self.ways[way.id_] = way
|
|
if way.user:
|
|
self.authors.add(way.user)
|
|
if way.tags.get("level"):
|
|
self.levels.union(parse_levels(way.tags["level"]))
|
|
if way.timestamp:
|
|
self.time.update(way.timestamp)
|
|
|
|
def add_relation(self, relation: OSMRelation) -> None:
|
|
"""Add relation and update map parameters."""
|
|
if relation.id_ in self.relations:
|
|
raise NotWellFormedOSMDataException(
|
|
f"Relation with duplicate id {relation.id_}."
|
|
)
|
|
self.relations[relation.id_] = relation
|
|
|
|
def parse_overpass(self, file_name: Path) -> None:
|
|
"""
|
|
Parse JSON structure extracted from Overpass API.
|
|
|
|
See https://wiki.openstreetmap.org/wiki/Overpass_API
|
|
"""
|
|
with file_name.open(encoding="utf-8") as input_file:
|
|
structure = json.load(input_file)
|
|
|
|
node_map: dict[int, OSMNode] = {}
|
|
way_map: dict[int, OSMWay] = {}
|
|
|
|
for element in structure["elements"]:
|
|
if element["type"] == "node":
|
|
node = OSMNode.parse_from_structure(element)
|
|
node_map[node.id_] = node
|
|
self.add_node(node)
|
|
if not self.view_box:
|
|
self.view_box = BoundaryBox(
|
|
node.coordinates[1],
|
|
node.coordinates[0],
|
|
node.coordinates[1],
|
|
node.coordinates[0],
|
|
)
|
|
self.view_box.update(node.coordinates)
|
|
|
|
for element in structure["elements"]:
|
|
if element["type"] == "way":
|
|
way = OSMWay.parse_from_structure(element, node_map)
|
|
way_map[way.id_] = way
|
|
self.add_way(way)
|
|
|
|
for element in structure["elements"]:
|
|
if element["type"] == "relation":
|
|
relation = OSMRelation.parse_from_structure(element)
|
|
self.add_relation(relation)
|
|
|
|
def parse_osm_file(self, file_name: Path) -> None:
|
|
"""
|
|
Parse OSM XML file.
|
|
|
|
See https://wiki.openstreetmap.org/wiki/OSM_XML
|
|
|
|
:param file_name: input XML file
|
|
:return: parsed map
|
|
"""
|
|
self.parse_osm(ElementTree.parse(file_name).getroot())
|
|
|
|
def parse_osm_text(self, text: str) -> None:
|
|
"""
|
|
Parse OSM XML data from text representation.
|
|
|
|
:param text: XML text representation
|
|
:return: parsed map
|
|
"""
|
|
self.parse_osm(ElementTree.fromstring(text))
|
|
|
|
def parse_osm(
|
|
self,
|
|
root: Element,
|
|
parse_nodes: bool = True,
|
|
parse_ways: bool = True,
|
|
parse_relations: bool = True,
|
|
) -> None:
|
|
"""
|
|
Parse OSM XML data.
|
|
|
|
:param root: top element of XML data
|
|
:param parse_nodes: whether nodes should be parsed
|
|
:param parse_ways: whether ways should be parsed
|
|
:param parse_relations: whether relations should be parsed
|
|
"""
|
|
for element in root:
|
|
if element.tag == "bounds":
|
|
self.parse_bounds(element)
|
|
elif element.tag == "object":
|
|
self.parse_object(element)
|
|
elif element.tag == "node" and parse_nodes:
|
|
node = OSMNode.from_xml_structure(element)
|
|
self.add_node(node)
|
|
elif element.tag == "way" and parse_ways:
|
|
self.add_way(OSMWay.from_xml_structure(element, self.nodes))
|
|
elif element.tag == "relation" and parse_relations:
|
|
self.add_relation(OSMRelation.from_xml_structure(element))
|
|
|
|
def parse_bounds(self, element: Element) -> None:
|
|
"""Parse view box from XML element."""
|
|
attributes = element.attrib
|
|
boundary_box: BoundaryBox = BoundaryBox(
|
|
float(attributes["minlon"]),
|
|
float(attributes["minlat"]),
|
|
float(attributes["maxlon"]),
|
|
float(attributes["maxlat"]),
|
|
)
|
|
if self.view_box:
|
|
self.view_box.combine(boundary_box)
|
|
else:
|
|
self.view_box = boundary_box
|
|
|
|
def parse_object(self, element: Element) -> None:
|
|
"""Parse astronomical object properties from XML element."""
|
|
self.equator_length = float(element.get("equator"))
|