PADME Train Wiki
Train Selection
Search
Trains
HydrodynamicTrain
816
util.py
Code fragments of util.py
from rasterio.warp import reproject, Resampling
from scipy import interpolate
import rasterio
import numpy as np
from rasterio.transform import from_origin
def merge_tifs(tif_list, out_path, nodata=-9999.):
''' Merges a list of TIF files into a single TIF file. :param tif_list: list of TIF file paths :param out_path: output TIF file path :param nodata: no-data value :return: '''
# List to store file datasets
files = []
# Variables to store max min coordinates
left, bottom, right, top = None, None, None, None
# Read each TIF file and determine total bounding box
for fp in tif_list:
src = rasterio.open(fp)
files.append(src)
l, b, r, t = src.bounds
left = l if left is None else min(left, l)
bottom = b if bottom is None else min(bottom, b)
right = r if right is None else max(right, r)
top = t if top is None else max(top, t)
height = int((top - bottom) / src.res[1])
width = int((right - left) / src.res[0])
num_bands = src.count
data = np.full((num_bands, height, width), nodata, dtype=np.float32)
# fill with nodata value and use the same data type as source
# Copy data from each TIF into the large array
for src in files: # Determine the position of this TIF in the large array
l, b, r, t = src.bounds
row_off = int((top - t) / src.res[1])
col_off = int((l - left) / src.res[0])
# Read this TIF's data into a numpy array
for bidx in range(src.count):
arr = src.read(bidx + 1)
# band index starts from 1
# Only copy data that is not the no-data value
mask = (arr != nodata)
# create mask of valid data
# get row, col indices of valid data
rows, cols = np.nonzero(mask)
data[bidx, row_off + rows, col_off + cols] = arr[rows, cols]
# Close the files
for src in files:
src.close()
# Define the metadata
out_meta = src.meta.copy()
out_meta.update({ "driver": "GTiff", "height": height, "width": width, "transform": from_origin(left, top, src.res[0], src.res[1]), "crs": src.crs, "nodata": nodata, "dtype": np.float32 })
# Write the large array to a new TIF
with rasterio.open(out_path, 'w', **out_meta) as dest:
dest.write(data.astype(np.float32))
def interpolate_array(data, xnew, ynew, new_res):
''' Interpolates a 2D array to a new resolution :param data: 2D array :param xnew: new x resolution :param ynew: new y resolution :param new_res: new resolution :return: interpolated 2D array '''
old_shape = data.shape
# Padding the array
pad_x = (new_res - (old_shape[1] % new_res)) % new_res
pad_y = (new_res - (old_shape[0] % new_res)) % new_res
data = np.pad(data, ((0, pad_y), (0, pad_x)))
x = np.linspace(0, old_shape[1] + pad_x, old_shape[1] + pad_x)
y = np.linspace(0, old_shape[0] + pad_y, old_shape[0] + pad_y)
f = interpolate.interp2d(x, y, data, kind='linear')
data = f(xnew, ynew)
# data[data < 0] = 0.
return data
def resample_tif(path_1, path_2, out_path, is_rainfall=False):
''' Resamples a geotiff file to match another geotiff file :param path_1: The source geotiff :param path_2: The target geotiff file :param out_path: The output geotiff file :return: '''
# Read the source geotiff
with rasterio.open(path_1) as src:
src_data = src.read()
src_meta = src.meta
# Read the target geotiff file
with rasterio.open(path_2) as dst:
dst_data = dst.read()
dst_meta = dst.meta
def bbox_contains(bbox1, bbox2):
return bbox1.left >= bbox2.left and bbox1.right <= bbox2.right and bbox1.bottom >= bbox2.bottom and bbox1.top <= bbox2.top
# Check if the bounding boxes overlap and the target geotiff is within source geotiff area
if not bbox_contains(dst.bounds, src.bounds):
raise ValueError('The target GeoTIFF is not entirely within the source GeoTIFF area')
# Initialize the output data with no data values
output_data = np.full((src_data.shape[0], dst_data.shape[1], dst_data.shape[2]), -9999.,dtype=np.float32)
# Reproject the source data to match the target geotiff area
for i in range(src_data.shape[0]):
reproject( source=src_data[i], destination=output_data[i], src_transform=src.transform, src_crs=src.crs, dst_transform=dst.transform, dst_crs=dst.crs, resampling=Resampling.nearest)
print(src_data.shape, output_data.shape, dst_data.shape)
# Apply the mask from the target geotiff (where value is -9999)
if not is_rainfall:
output_data[:, dst_data[0] == -9999.] = -9999.
# Write the output data to a new geotiff file
output_meta = dst_meta.copy()
output_meta.update({"dtype": np.float32, "count": src_data.shape[0]})
with rasterio.open(out_path, 'w', **output_meta) as out:
out.write(output_data)
Graph
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
undefined
util.py
from rasterio.warp import reproject, Resampling
None
from scipy import interpolate
import rasterio
import numpy as np
from rasterio.transform import from_origin
def merge_tifs(tif_list, out_path, nodata=-9999.):
''' Merges a list of TIF files into a single TIF file. :param tif_list: list of TIF file paths :param out_path: output TIF file path :param nodata: no-data value :return: '''
# List to store file datasets
files = []
# Variables to store max min coordinates
left, bottom, right, top = None, None, None, None
# Read each TIF file and determine total bounding box
for fp in tif_list:
src = rasterio.open(fp)
files.append(src)
l, b, r, t = src.bounds
left = l if left is None else min(left, l)
bottom = b if bottom is None else min(bottom, b)
right = r if right is None else max(right, r)
top = t if top is None else max(top, t)
height = int((top - bottom) / src.res[1])
width = int((right - left) / src.res[0])
num_bands = src.count
data = np.full((num_bands, height, width), nodata, dtype=np.float32)
# fill with nodata value and use the same data type as source
# Copy data from each TIF into the large array
for src in files: # Determine the position of this TIF in the large array
row_off = int((top - t) / src.res[1])
col_off = int((l - left) / src.res[0])
# Read this TIF's data into a numpy array
for bidx in range(src.count):
arr = src.read(bidx + 1)
# band index starts from 1
# Only copy data that is not the no-data value
mask = (arr != nodata)
# create mask of valid data
# get row, col indices of valid data
rows, cols = np.nonzero(mask)
data[bidx, row_off + rows, col_off + cols] = arr[rows, cols]
# Close the files
for src in files:
src.close()
# Define the metadata
out_meta = src.meta.copy()
out_meta.update({ "driver": "GTiff", "height": height, "width": width, "transform": from_origin(left, top, src.res[0], src.res[1]), "crs": src.crs, "nodata": nodata, "dtype": np.float32 })
# Write the large array to a new TIF
with rasterio.open(out_path, 'w', **out_meta) as dest:
dest.write(data.astype(np.float32))
def interpolate_array(data, xnew, ynew, new_res):
''' Interpolates a 2D array to a new resolution :param data: 2D array :param xnew: new x resolution :param ynew: new y resolution :param new_res: new resolution :return: interpolated 2D array '''
old_shape = data.shape
# Padding the array
pad_x = (new_res - (old_shape[1] % new_res)) % new_res
pad_y = (new_res - (old_shape[0] % new_res)) % new_res
data = np.pad(data, ((0, pad_y), (0, pad_x)))
x = np.linspace(0, old_shape[1] + pad_x, old_shape[1] + pad_x)
y = np.linspace(0, old_shape[0] + pad_y, old_shape[0] + pad_y)
f = interpolate.interp2d(x, y, data, kind='linear')
data = f(xnew, ynew)
# data[data < 0] = 0.
return data
def resample_tif(path_1, path_2, out_path, is_rainfall=False):
''' Resamples a geotiff file to match another geotiff file :param path_1: The source geotiff :param path_2: The target geotiff file :param out_path: The output geotiff file :return: '''
# Read the source geotiff
with rasterio.open(path_1) as src:
src_data = src.read()
src_meta = src.meta
# Read the target geotiff file
with rasterio.open(path_2) as dst:
dst_data = dst.read()
dst_meta = dst.meta
def bbox_contains(bbox1, bbox2):
return bbox1.left >= bbox2.left and bbox1.right <= bbox2.right and bbox1.bottom >= bbox2.bottom and bbox1.top <= bbox2.top
# Check if the bounding boxes overlap and the target geotiff is within source geotiff area
if not bbox_contains(dst.bounds, src.bounds):
raise ValueError('The target GeoTIFF is not entirely within the source GeoTIFF area')
# Initialize the output data with no data values
output_data = np.full((src_data.shape[0], dst_data.shape[1], dst_data.shape[2]), -9999.,dtype=np.float32)
# Reproject the source data to match the target geotiff area
for i in range(src_data.shape[0]):
reproject( source=src_data[i], destination=output_data[i], src_transform=src.transform, src_crs=src.crs, dst_transform=dst.transform, dst_crs=dst.crs, resampling=Resampling.nearest)
print(src_data.shape, output_data.shape, dst_data.shape)
# Apply the mask from the target geotiff (where value is -9999)
if not is_rainfall:
output_data[:, dst_data[0] == -9999.] = -9999.
# Write the output data to a new geotiff file
output_meta = dst_meta.copy()
output_meta.update({"dtype": np.float32, "count": src_data.shape[0]})
with rasterio.open(out_path, 'w', **output_meta) as out:
out.write(output_data)
Search
Train Selection