download_files_from_LGL_BW.py 5.29 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
"""
LoD2 CityGML tiles are available for whole Baden-Württemberg, from LGL.

https://opengeodata.lgl-bw.de/#/(sidenav:product/12)

This script downloads the requires tiles for given regions
(as WKT strings, in *polygons* variable), and extracts the region.

Required:
* Python
* pyproj project (https://pypi.org/project/pyproj/)
* SimStadt installed on the Desktop (for RegionChooser)

Eric Duminil, 2024
"""
from pathlib import Path
from math import floor
import subprocess
import re
# pip install pyproj
from pyproj import CRS
from pyproj import Transformer
import urllib.request
import time
import zipfile

###### User input ##########
POLYGONS = {
    "StuttgartCenter": "POLYGON((9.175287 48.780916, 9.185501 48.777522, 9.181467 48.773704, 9.174429 48.768472, 9.168807 48.773902, 9.175287 48.780916))",
    # "AnotherRegion": "Another WKT Polygon...",
}
# Should RegionChooser extract the regions from multiple CityGMLs?
EXTRACT_REGIONS = True
############################


CITYGML_SERVER = "https://opengeodata.lgl-bw.de/data/lod2"
RASTER = 2  # [km]
BUNDESLAND = 'bw'
WGS84 = CRS.from_epsg(4326)

LOCAL_CRS = CRS.from_epsg(32632)  # UTM32N, used in BW. https://epsg.io/32632
UTM = 32

SCRIPT_DIR = Path(__file__).parent
WAIT_BETWEEN_DOWNLOADS = 5  # [s] Be nice to LGL Server.

if EXTRACT_REGIONS:
    SIMSTADT_FOLDER = next(Path.home().glob('Desktop/SimStadt*_0.*/'))
    GML_GLOB = "LoD2_*/LoD2_*.gml"


def coordinates_to_grid(longitude: float, latitude: float, local_utm: CRS) -> tuple[int, int]:
    """Returns (x, y) of the tile on CITYGML_SERVER containing a given point."""
    transformer = Transformer.from_crs(WGS84, local_utm, always_xy=True)
    x, y = transformer.transform(longitude, latitude)
    x = floor(x / 1000) - 1  # Odd x
    y = floor(y / 1000)  # Even y
    x -= x % RASTER
    y -= y % RASTER
    return (x + 1, y)


def wkt_polygon_to_grid_coords(location_name: str, wkt: str) -> tuple[int, int, int, int]:
    """Returns (x, y) of lower-left and bottom-right tiles, containing a given region."""
    if 'POLYGON' not in wkt:
        raise ValueError(f"wkt for {location_name} should be a WKT POLYGON or MULTIPOLYGON")

    coordinates = re.findall(r'\-?\d+\.\d+', wkt)

    lons = [float(lon) for lon in coordinates[::2]]
    lats = [float(lat) for lat in coordinates[1::2]]

    min_lon, max_lon = min(lons), max(lons)
    min_lat, max_lat = min(lats), max(lats)

    print("%s (%.3f°N %.3f°E -> %.3f°N %.3f°E)" % (location_name, max_lat, min_lon, min_lat, max_lon))

    x1, y1 = coordinates_to_grid(min_lon, min_lat, LOCAL_CRS)
    x2, y2 = coordinates_to_grid(max_lon, max_lat, LOCAL_CRS)

    return (x1, x2, y1, y2)


def download_rectangle(output_dir: Path, x1: int, x2: int, y1: int, y2: int) -> None:
    """Downloads every zip of a given region, to output_dir, and extracts CityGML files."""
    for x in range(x1, x2 + 1, RASTER):
        for y in range(y1, y2 + 1, RASTER):
            citygml_zip = f"LoD2_{UTM}_{x}_{y}_{RASTER}_{BUNDESLAND}.zip"
            citygml_url = f"{CITYGML_SERVER}/{citygml_zip}"
            local_zip = output_dir / citygml_zip  # .replace('.zip', '.gml')
            if local_zip.exists():
                print(f"  {local_zip.name} already in {output_dir.name}/")
            else:
                print(f"  Download {citygml_zip} to {output_dir.name}/ ", end='')
                urllib.request.urlretrieve(citygml_url, local_zip)
                time.sleep(WAIT_BETWEEN_DOWNLOADS)
                print("✓")

                print(f"    Extract {citygml_zip} to {output_dir.name}/ ", end='')
                print("✓")
                print("")
                with zipfile.ZipFile(local_zip, "r") as zip_ref:
                    zip_ref.extractall(output_dir)


def extract_region(output_dir: Path, location_name: str, wkt: str) -> None:
    """Uses RegionChooser to extract a given region from all the CityGML files found in subfolder."""
    output_file = output_dir / (location_name + '.gml')
    if output_file.exists():
        print(f"  {output_file} already exists. Not extracting.")
        return
    print(f"  Extracting {output_file}.")
    region_chooser_libs = Path(SIMSTADT_FOLDER).expanduser() / 'lib/*'
    gml_inputs = output_dir.glob(GML_GLOB)
116
117
118
119
120
121
    result = subprocess.run(['java', '-classpath', f'{region_chooser_libs}',
                             'eu.simstadt.regionchooser.RegionChooserCLI',
                             '--input', ','.join(str(gml) for gml in gml_inputs),
                             '--output', str(output_file),
                             '--wkt', '-',
                             ],
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
                            input=wkt,
                            text=True,
                            capture_output=True
                            )
    if (result.stderr):
        print(result.stderr)
    print("  DONE!")


def main(polygons: dict[str, str]) -> None:
    """Downloads ZIP files, extracts CityGML files, and selects desired region."""
    for location_name, wkt in polygons.items():
        output_dir = SCRIPT_DIR / (location_name + '.proj')
        output_dir.mkdir(parents=True, exist_ok=True)
        x1, x2, y1, y2 = wkt_polygon_to_grid_coords(location_name, wkt)
        download_rectangle(output_dir, x1, x2, y1, y2)
        if EXTRACT_REGIONS:
            extract_region(output_dir, location_name, wkt)
        print()


if __name__ == '__main__':
    main(POLYGONS)