主要功能是处理地理空间栅格数据(TIFF文件)和灾害点数据(CSV文件),统计不同分区内的灾害点分布情况,并计算灾害相对密度等统计指标。
TIFF文件:已经重分类后的文件
CSV文件:灾害点经纬度坐标
可根据实际的分类数修改下面代码
import sys
import numpy as np
from osgeo import gdal
import pandas as pd
from typing import Dict, Tuple, Listdef read_tif_file(filepath: str) -> Tuple:"""读取TIFF文件并处理NoData值"""dataset = gdal.Open(filepath)if dataset is None:raise ValueError("无法打开TIFF文件")# 获取基础信息projection = dataset.GetProjection()transform = dataset.GetGeoTransform()x_size = dataset.RasterXSizey_size = dataset.RasterYSizebands = dataset.RasterCount# 初始化存储数组data_1 = np.zeros((y_size, x_size), dtype=np.float32)# 只处理第一个波段(根据需求调整)band = dataset.GetRasterBand(1)nodata = band.GetNoDataValue()arr = band.ReadAsArray(0, 0, x_size, y_size).astype(np.float32)# 处理NoData值if nodata is not None:arr[arr == nodata] = np.nandata_1 = arrreturn projection, transform, data_1, x_size, y_size, bandsdef disaster_point_optimized(excel_filepath: str, tif_filepath: str) -> Tuple[List[List[int]], int]:"""优化后的灾害点坐标转换"""try:# 读取灾害点数据disaster_df = pd.read_csv(excel_filepath, encoding='ISO-8859-1')required_cols = ['JD', 'WD']if not all(col in disaster_df.columns for col in required_cols):raise ValueError(f"CSV文件必须包含 {required_cols} 列")# 获取地理转换参数tif_data = read_tif_file(tif_filepath)transform = tif_data[1]x_size = tif_data[3]y_size = tif_data[4]# 坐标转换逻辑longitude = disaster_df['JD'].valueslatitude = disaster_df['WD'].valuesx_origin = transform[0]y_origin = transform[3]x_res = transform[1]y_res = abs(transform[5])# 计算行列索引col_indices = ((longitude - x_origin) / x_res).astype(int)row_indices = ((y_origin - latitude) / y_res).astype(int)# 限制索引范围col_indices = np.clip(col_indices, 0, x_size - 1)row_indices = np.clip(row_indices, 0, y_size - 1)return [[int(row), int(col)] for row, col in zip(row_indices, col_indices)], len(row_indices)except Exception as e:print(f"处理出错: {str(e)}")return [], 0def count_partition_statistics(tif_filepath: str, csv_filepath: str) -> Dict[int, Dict[str, float]]:"""统计分区指标(含异常值过滤)"""# 获取灾害点坐标disaster_indices, total_points = disaster_point_optimized(csv_filepath, tif_filepath)# 读取栅格数据tif_data = read_tif_file(tif_filepath)data_1 = tif_data[2]y_size = tif_data[4]x_size = tif_data[3]# 有效值过滤(处理NoData和异常值)valid_mask = ~np.isnan(data_1)data_1_int = np.full(data_1.shape, -9999, dtype=int) # 用非常见值填充无效位置data_1_int[valid_mask] = data_1[valid_mask].astype(int)# 统计有效分区(1-5)valid_values = (data_1_int >= 1) & (data_1_int <= 5)unique_values, counts = np.unique(data_1_int[valid_values], return_counts=True)pixel_counts = dict(zip(unique_values, counts))# 统计灾害点分布disaster_counts = {}for row, col in disaster_indices:if 1 <= row < y_size and 0 <= col < x_size:value = data_1_int[row, col]if 1<= value <= 6: # 只统计有效分区disaster_counts[value] = disaster_counts.get(value, 0) + 1# 合并结果并计算统计量S_total = sum(pixel_counts.values())D_total = sum(disaster_counts.values())# 构建结果字典result = {}for value in range(1, 6): # 保证1-5都有输出pc = pixel_counts.get(value, 0)dc = disaster_counts.get(value, 0)# 计算相对密度if D_total == 0 or pc == 0:rd = 0.0else:rd = (dc / D_total) / (pc / S_total)result[value] = {'pixel_count': pc,'disaster_count': dc,'relative_density': round(rd, 4)}return resultif __name__ == "__main__":tif_path = r''csv_path = r''stats = count_partition_statistics(tif_path, csv_path)# 格式化输出print("分区值\t像元个数\t灾害点个数\t相对密度")for value in sorted(stats.keys()):data = stats[value]print(f"{value}\t{data['pixel_count']}\t{data['disaster_count']}\t{data['relative_density']:.4f}")
效果: