""" Reading OpenStreetMap data from XML file. """ import json from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Set, Union import numpy as np from roentgen.ui import progress_bar from roentgen.util import MinMax __author__ = "Sergey Vartanov" __email__ = "me@enzet.ru" OSM_TIME_PATTERN: str = "%Y-%m-%dT%H:%M:%SZ" class Tagged: """ OpenStreetMap element (node, way or relation) with tags. """ def __init__(self): self.tags: Dict[str, str] = {} def get_tag(self, key: str) -> Optional[str]: """ Get tag value or None if it doesn't exist. :param key: tag key :return: tag value or None """ if key in self.tags: return self.tags[key] return None class OSMNode(Tagged): """ OpenStreetMap node. See https://wiki.openstreetmap.org/wiki/Node """ def __init__(self): super().__init__() self.id_: Optional[int] = None self.coordinates: Optional[np.array] = None self.visible: Optional[str] = None self.changeset: Optional[str] = None self.timestamp: Optional[datetime] = None self.user: Optional[str] = None self.uid: Optional[str] = None def parse_from_xml(self, text: str, is_full: bool = False) -> "OSMNode": """ Parse from XML node representation. :param text: XML node representation :param is_full: if false, parse only ID, latitude and longitude """ self.id_ = int(get_value("id", text)) self.coordinates = np.array(( float(get_value("lat", text)), float(get_value("lon", text)))) if is_full: self.visible = get_value("visible", text) self.changeset = get_value("changeset", text) self.timestamp = datetime.strptime( get_value("timestamp", text), OSM_TIME_PATTERN ) self.user = get_value("user", text) self.uid = get_value("uid", text) return self def parse_from_structure(self, structure: Dict[str, Any]) -> "OSMNode": """ Parse node from Overpass-like structure. :param structure: input structure """ self.id_ = structure["id"] self.coordinates = np.array((structure["lat"], structure["lon"])) if "tags" in structure: self.tags = structure["tags"] return self class OSMWay(Tagged): """ OpenStreetMap way. See https://wiki.openstreetmap.org/wiki/Way """ def __init__(self, id_: int = 0, nodes: Optional[List[OSMNode]] = None): super().__init__() self.id_: int = id_ self.nodes: List[OSMNode] = [] if nodes is None else nodes self.visible: Optional[str] = None self.changeset: Optional[str] = None self.user: Optional[str] = None self.timestamp: Optional[datetime] = None self.uid: Optional[str] = None def parse_from_xml(self, text: str, is_full: bool = False) -> "OSMWay": """ Parse way from XML way representation. :param text: XML way representation :param is_full: if false, parse only ID """ self.id_ = int(get_value("id", text)) if is_full: self.visible = get_value("visible", text) self.changeset = get_value("changeset", text) self.timestamp = datetime.strptime( get_value("timestamp", text), OSM_TIME_PATTERN) self.user = get_value("user", text) self.uid = get_value("uid", text) return self def parse_from_structure( self, structure: Dict[str, Any], nodes ) -> "OSMWay": """ Parse way from Overpass-like structure. :param structure: input structure :param nodes: node structure """ self.id_ = structure["id"] for node_id in structure["nodes"]: self.nodes.append(nodes[node_id]) if "tags" in structure: self.tags = structure["tags"] return self def is_cycle(self) -> bool: """ Is way a cycle way or an area boundary. """ return self.nodes[0] == self.nodes[-1] def try_to_glue(self, other: "OSMWay") -> Optional["OSMWay"]: """ Create new combined way if ways share endpoints. """ if self.nodes[0] == other.nodes[0]: return OSMWay(nodes=list(reversed(other.nodes[1:])) + self.nodes) if self.nodes[0] == other.nodes[-1]: return OSMWay(nodes=other.nodes[:-1] + self.nodes) if self.nodes[-1] == other.nodes[-1]: return OSMWay(nodes=self.nodes + list(reversed(other.nodes[:-1]))) if self.nodes[-1] == other.nodes[0]: return OSMWay(nodes=self.nodes + other.nodes[1:]) return None def __repr__(self) -> str: return f"Way <{self.id_}> {self.nodes}" class OSMRelation(Tagged): """ OpenStreetMap relation. See https://wiki.openstreetmap.org/wiki/Relation """ def __init__(self, id_: int = 0): super().__init__() self.id_: int = id_ self.members: List["OSMMember"] = [] self.user: Optional[str] = None self.timestamp: Optional[datetime] = None def parse_from_xml(self, text: str) -> "OSMRelation": """ Parse from XML relation representation. :param text: XML way representation """ self.id_ = int(get_value("id", text)) self.user = get_value("user", text) self.timestamp = datetime.strptime( get_value("timestamp", text), OSM_TIME_PATTERN) return self def parse_from_structure(self, structure: Dict[str, Any]) -> "OSMRelation": """ Parse relation from Overpass-like structure. :param structure: input structure """ self.id_ = structure["id"] for member in structure["members"]: mem = OSMMember() mem.type_ = member["type"] mem.role = member["role"] mem.ref = member["ref"] self.members.append(mem) if "tags" in structure: self.tags = structure["tags"] return self class OSMMember: """ Member of OpenStreetMap relation. """ def __init__(self): self.type_ = "" self.ref = 0 self.role = "" def parse_from_xml(self, text: str) -> "OSMMember": """ Parse relation member from XML way representation. :param text: XML relation member representation """ self.type_: str = get_value("type", text) self.ref: int = int(get_value("ref", text)) self.role: str = get_value("role", text) return self def get_value(key: str, text: str): """ Parse xml value from the tag in the format of key="value". """ if key + '="' in text: start_index: int = text.find(key + '="') + 2 end_index: int = start_index + len(key) value = text[end_index:text.find('"', end_index)] return value return None class Map: """ The whole OpenStreetMap information about nodes, ways, and relations. """ def __init__(self): self.node_map: Dict[int, OSMNode] = {} self.way_map: Dict[int, OSMWay] = {} self.relation_map: Dict[int, OSMRelation] = {} self.authors: Set[str] = set() self.time: MinMax = MinMax() self.boundary_box: List[MinMax] = [MinMax(), MinMax()] def add_node(self, node: OSMNode): """ Add node and update map parameters. """ self.node_map[node.id_] = node if node.user: self.authors.add(node.user) self.time.update(node.timestamp) self.boundary_box[0].update(node.coordinates[0]) self.boundary_box[1].update(node.coordinates[1]) def add_way(self, way: OSMWay): """ Add way and update map parameters. """ self.way_map[way.id_] = way if way.user: self.authors.add(way.user) self.time.update(way.timestamp) def add_relation(self, relation: OSMRelation): """ Add relation and update map parameters. """ self.relation_map[relation.id_] = relation class OverpassReader: """ Reader for JSON structure extracted from Overpass API. See https://wiki.openstreetmap.org/wiki/Overpass_API """ def __init__(self): self.map_ = Map() def parse_json_file(self, file_name: Path) -> Map: """ Parse JSON structure from the file and construct map. """ with file_name.open() as input_file: structure = json.load(input_file) node_map = {} way_map = {} for element in structure["elements"]: if element["type"] == "node": node = OSMNode().parse_from_structure(element) node_map[node.id_] = node self.map_.add_node(node) for element in structure["elements"]: if element["type"] == "way": way = OSMWay().parse_from_structure(element, node_map) way_map[way.id_] = way self.map_.add_way(way) for element in structure["elements"]: if element["type"] == "relation": relation = OSMRelation().parse_from_structure(element) self.map_.add_relation(relation) return self.map_ class OSMReader: """ OSM XML representation reader. """ def __init__(self): self.map_ = Map() def parse_osm_file( self, file_name: Path, parse_nodes: bool = True, parse_ways: bool = True, parse_relations: bool = True, full: bool = False ) -> Map: """ Parse OSM XML representation. :param file_name: input OSM XML file name """ with file_name.open() as input_file: lines_number: int = sum(1 for _ in input_file) print(f"Parsing OSM file {file_name}...") line_number: int = 0 element: Optional[Union[OSMNode, OSMWay, OSMRelation]] = None with file_name.open() as input_file: for line in input_file.readlines(): # type: str line = line.strip() line_number += 1 progress_bar(line_number, lines_number, text="Parsing") # Node parsing. if line.startswith("": self.map_.add_node(element) # Way parsing. elif line.startswith("": self.map_.add_way(element) # Relation parsing. elif line.startswith("": self.map_.add_relation(element) # Elements parsing. elif line.startswith("