Code fragments of util.py

Graph

undefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedutil.py from rasterio.warp import reproject, Resampling None from scipy import interpolate import rasterio import numpy as np from rasterio.transform import from_origin def merge_tifs(tif_list, out_path, nodata=-9999.): ''' Merges a list of TIF files into a single TIF file. :param tif_list: list of TIF file paths :param out_path: output TIF file path :param nodata: no-data value :return: ''' # List to store file datasets files = [] # Variables to store max min coordinates left, bottom, right, top = None, None, None, None # Read each TIF file and determine total bounding box for fp in tif_list: src = rasterio.open(fp) files.append(src) l, b, r, t = src.bounds left = l if left is None else min(left, l) bottom = b if bottom is None else min(bottom, b) right = r if right is None else max(right, r) top = t if top is None else max(top, t) height = int((top - bottom) / src.res[1]) width = int((right - left) / src.res[0]) num_bands = src.count data = np.full((num_bands, height, width), nodata, dtype=np.float32) # fill with nodata value and use the same data type as source # Copy data from each TIF into the large array for src in files: # Determine the position of this TIF in the large array row_off = int((top - t) / src.res[1]) col_off = int((l - left) / src.res[0]) # Read this TIF's data into a numpy array for bidx in range(src.count): arr = src.read(bidx + 1) # band index starts from 1 # Only copy data that is not the no-data value mask = (arr != nodata) # create mask of valid data # get row, col indices of valid data rows, cols = np.nonzero(mask) data[bidx, row_off + rows, col_off + cols] = arr[rows, cols] # Close the files for src in files: src.close() # Define the metadata out_meta = src.meta.copy() out_meta.update({ "driver": "GTiff", "height": height, "width": width, "transform": from_origin(left, top, src.res[0], src.res[1]), "crs": src.crs, "nodata": nodata, "dtype": np.float32 }) # Write the large array to a new TIF with rasterio.open(out_path, 'w', **out_meta) as dest: dest.write(data.astype(np.float32)) def interpolate_array(data, xnew, ynew, new_res): ''' Interpolates a 2D array to a new resolution :param data: 2D array :param xnew: new x resolution :param ynew: new y resolution :param new_res: new resolution :return: interpolated 2D array ''' old_shape = data.shape # Padding the array pad_x = (new_res - (old_shape[1] % new_res)) % new_res pad_y = (new_res - (old_shape[0] % new_res)) % new_res data = np.pad(data, ((0, pad_y), (0, pad_x))) x = np.linspace(0, old_shape[1] + pad_x, old_shape[1] + pad_x) y = np.linspace(0, old_shape[0] + pad_y, old_shape[0] + pad_y) f = interpolate.interp2d(x, y, data, kind='linear') data = f(xnew, ynew) # data[data < 0] = 0. return data def resample_tif(path_1, path_2, out_path, is_rainfall=False): ''' Resamples a geotiff file to match another geotiff file :param path_1: The source geotiff :param path_2: The target geotiff file :param out_path: The output geotiff file :return: ''' # Read the source geotiff with rasterio.open(path_1) as src: src_data = src.read() src_meta = src.meta # Read the target geotiff file with rasterio.open(path_2) as dst: dst_data = dst.read() dst_meta = dst.meta def bbox_contains(bbox1, bbox2): return bbox1.left >= bbox2.left and bbox1.right <= bbox2.right and bbox1.bottom >= bbox2.bottom and bbox1.top <= bbox2.top # Check if the bounding boxes overlap and the target geotiff is within source geotiff area if not bbox_contains(dst.bounds, src.bounds): raise ValueError('The target GeoTIFF is not entirely within the source GeoTIFF area') # Initialize the output data with no data values output_data = np.full((src_data.shape[0], dst_data.shape[1], dst_data.shape[2]), -9999.,dtype=np.float32) # Reproject the source data to match the target geotiff area for i in range(src_data.shape[0]): reproject( source=src_data[i], destination=output_data[i], src_transform=src.transform, src_crs=src.crs, dst_transform=dst.transform, dst_crs=dst.crs, resampling=Resampling.nearest) print(src_data.shape, output_data.shape, dst_data.shape) # Apply the mask from the target geotiff (where value is -9999) if not is_rainfall: output_data[:, dst_data[0] == -9999.] = -9999. # Write the output data to a new geotiff file output_meta = dst_meta.copy() output_meta.update({"dtype": np.float32, "count": src_data.shape[0]}) with rasterio.open(out_path, 'w', **output_meta) as out: out.write(output_data)