5
5
import pandas as pd
6
6
import numpy as np
7
7
from numpy .typing import NDArray
8
- from typing import Dict , List , Literal , Optional , Sequence , Tuple , Union
8
+ from typing import Dict , List , Literal , Optional , Sequence , Tuple , Union , Any
9
9
from loguru import logger
10
10
11
+ def to_pickle (obj ):
12
+ """Encode an object to a pickle byte stream and store in DataFrame."""
13
+ import pickle
14
+ import pandas as pd
15
+ return pd .DataFrame (
16
+ {"data_type" : [type (obj ).__name__ ], "data_content" : [pickle .dumps (obj )]}
17
+ )
18
+
19
+ def from_pickle (df ):
20
+ """Decode an object from a DataFrame containing pickle byte stream."""
21
+ import pickle
22
+ return pickle .loads (df ["data_content" ].iloc [0 ])
23
+
24
+ def df_summary (df , description = "" , n_head = 5 , n_tail = 5 , n_sample = 5 , n_unique = 100 , add_details = True ):
25
+ val = description + "\n \n "
26
+ val += "These are stats for df (pd.DataFrame):\n "
27
+ val += f"{ list (df .columns )= } \n \n "
28
+ val += f"{ df .isnull ().sum ()= } \n \n "
29
+ val += f"{ df .describe ().to_json ()= } \n \n "
30
+ val += f"{ df .head (n_head ).to_json ()= } \n \n "
31
+ val += f"{ df .tail (n_tail ).to_json ()= } \n \n "
32
+ if len (df ) > n_sample :
33
+ val += f"{ df .sample (n_sample ).to_json ()= } \n \n "
34
+ if add_details :
35
+ if len (df ) <= n_unique :
36
+ val += f"{ df .to_json ()} \n \n "
37
+ else :
38
+ for c in df .columns :
39
+ value_counts = df [c ].value_counts ()
40
+ df [c ].value_counts ().head ()
41
+ val += f"df[{ c } ].value_counts()\n { value_counts } \n \n "
42
+ val += f"{ df [c ].unique ()[:n_unique ]} \n \n "
43
+ return val
44
+
45
+ def get_diff_text (text1 : str , text2 : str , as_html : bool = True , only_diff : bool = False ) -> str :
46
+ import difflib
47
+ import html
48
+
49
+ diff = difflib .ndiff (text1 .splitlines (keepends = True ), text2 .splitlines (keepends = True ))
50
+ processed_diff = []
51
+
52
+ if not as_html :
53
+ for line in diff :
54
+ if line .startswith ("+" ):
55
+ processed_diff .append (f"ADD: { line } " ) # Additions
56
+ elif line .startswith ("-" ):
57
+ processed_diff .append (f"DEL: { line } " ) # Deletions
58
+ else :
59
+ if not only_diff :
60
+ processed_diff .append (f" { line } " ) # Unchanged lines
61
+ return "\n " .join (processed_diff )
62
+
63
+ for line in diff :
64
+ escaped_line = html .escape (line ) # Escape HTML to preserve special characters
65
+
66
+ if line .startswith ("+" ):
67
+ processed_diff .append (f"<span style='color:green; line-height:normal;'> { escaped_line } </span><br>" ) # Green for additions
68
+ elif line .startswith ("-" ):
69
+ processed_diff .append (f"<span style='color:red; line-height:normal;'> { escaped_line } </span><br>" ) # Red for deletions
70
+ else :
71
+ if not only_diff :
72
+ processed_diff .append (f"<span style='color:gray; line-height:normal;'> { escaped_line } </span><br>" ) # Gray for unchanged lines
73
+
74
+ # HTML structure with a dropdown for selecting background color
75
+ html_output = """
76
+ <div>
77
+ <label for="backgroundColor" style="color:gray;">Choose Background Color: </label>
78
+ <select id="backgroundColor" onchange="document.getElementById('diff-container').style.backgroundColor = this.value;">
79
+ <option value="#111111">Dark Gray</option>
80
+ <option value="#f0f0f0">Light Gray</option>
81
+ <option value="#ffffff">White</option>
82
+ <option value="#e0f7fa">Cyan</option>
83
+ <option value="#ffebee">Pink</option>
84
+ <option value="#c8e6c9">Green</option>
85
+ </select>
86
+ </div>
87
+ <div id="diff-container" style="background-color:#111111; padding:10px; font-family:monospace; white-space:pre; line-height:normal;">
88
+ {}</div>
89
+ """ .format ("" .join (processed_diff ))
90
+ return html_output
91
+
92
+
11
93
def json_path_from_secret (var = 'gcs_fused' ):
12
94
import json
13
95
import tempfile
@@ -1106,7 +1188,7 @@ def df_to_gdf(df, cols_lonlat=None, verbose=False):
1106
1188
return df
1107
1189
1108
1190
1109
- def to_gdf (
1191
+ def geo_convert (
1110
1192
data ,
1111
1193
crs = None ,
1112
1194
cols_lonlat = None ,
@@ -1119,6 +1201,13 @@ def to_gdf(
1119
1201
import pandas as pd
1120
1202
import mercantile
1121
1203
1204
+ # Convert xyz dict to xyz array
1205
+ if isinstance (data , dict ) and set (data .keys ()) == {'x' , 'y' , 'z' }:
1206
+ try :
1207
+ data = [int (data ['x' ]), int (data ['y' ]), int (data ['z' ])]
1208
+ except (ValueError , TypeError ):
1209
+ pass
1210
+
1122
1211
# Handle the bounds case specifically
1123
1212
if data is None or (isinstance (data , (list , tuple , np .ndarray )) and len (data ) == 4 ):
1124
1213
bounds = [- 180 , - 90 , 180 , 90 ] if data is None else data
@@ -1236,7 +1325,7 @@ def geo_buffer(
1236
1325
assert data .crs not in (
1237
1326
None ,
1238
1327
"" ,
1239
- ), "no crs was not found. use to_gdf to add crs"
1328
+ ), "no crs was not found. use geo_convert to add crs"
1240
1329
if str (dst_crs ).lower ().replace ("_" , "" ).replace (" " , "" ).replace ("-" , "" ) in [
1241
1330
"original" ,
1242
1331
"originalcrs" ,
@@ -1282,15 +1371,15 @@ def geo_bbox(
1282
1371
import pyproj
1283
1372
src_crs = data .crs
1284
1373
if not dst_crs :
1285
- return to_gdf (
1374
+ return geo_convert (
1286
1375
shapely .geometry .box (* data .total_bounds ), crs = src_crs , verbose = verbose
1287
1376
)
1288
1377
elif str (dst_crs ).lower () == "utm" :
1289
1378
dst_crs = data .estimate_utm_crs ()
1290
1379
logger .debug (f"estimated dst_crs={ crs_display (dst_crs )} " )
1291
1380
transformer = pyproj .Transformer .from_crs (src_crs , dst_crs , always_xy = True )
1292
1381
dst_bounds = transformer .transform_bounds (* data .total_bounds )
1293
- return to_gdf (
1382
+ return geo_convert (
1294
1383
shapely .geometry .box (* dst_bounds , ccw = True ), crs = dst_crs , verbose = verbose
1295
1384
)
1296
1385
@@ -1365,9 +1454,9 @@ def geo_join(
1365
1454
import geopandas as gpd
1366
1455
import shapely
1367
1456
if type (left ) != gpd .GeoDataFrame :
1368
- left = to_gdf (left , verbose = verbose )
1457
+ left = geo_convert (left , verbose = verbose )
1369
1458
if type (right ) != gpd .GeoDataFrame :
1370
- right = to_gdf (right , verbose = verbose )
1459
+ right = geo_convert (right , verbose = verbose )
1371
1460
left_geom_cols = get_geo_cols (left )
1372
1461
right_geom_cols = get_geo_cols (right )
1373
1462
if verbose :
@@ -1483,9 +1572,9 @@ def geo_distance(
1483
1572
import geopandas as gpd
1484
1573
import shapely
1485
1574
if type (left ) != gpd .GeoDataFrame :
1486
- left = to_gdf (left , verbose = verbose )
1575
+ left = geo_convert (left , verbose = verbose )
1487
1576
if type (right ) != gpd .GeoDataFrame :
1488
- right = to_gdf (right , verbose = verbose )
1577
+ right = geo_convert (right , verbose = verbose )
1489
1578
left_geom_cols = get_geo_cols (left )
1490
1579
right_geom_cols = get_geo_cols (right )
1491
1580
cols_right = list (cols_right )
@@ -1554,7 +1643,7 @@ def geo_samples(
1554
1643
(random .uniform (min_x , max_x ), random .uniform (min_y , max_y ))
1555
1644
for _ in range (n_samples )
1556
1645
]
1557
- return to_gdf (pd .DataFrame (points , columns = ["lng" , "lat" ]))[["geometry" ]]
1646
+ return geo_convert (pd .DataFrame (points , columns = ["lng" , "lat" ]))[["geometry" ]]
1558
1647
1559
1648
1560
1649
def bbox_stac_items (bounds , table ):
@@ -1989,7 +2078,7 @@ def mercantile_polyfill(geom, zooms=[15], compact=True, k=None):
1989
2078
import mercantile
1990
2079
import shapely
1991
2080
1992
- gdf = to_gdf (geom , crs = 4326 )
2081
+ gdf = geo_convert (geom , crs = 4326 )
1993
2082
geometry = gdf .geometry [0 ]
1994
2083
1995
2084
tile_list = list (mercantile .tiles (* geometry .bounds , zooms = zooms ))
@@ -2635,7 +2724,7 @@ def estimate_zoom(bounds, target_num_tiles=1):
2635
2724
return zoom + 1
2636
2725
2637
2726
2638
- def get_tile (
2727
+ def get_tiles (
2639
2728
bounds = None , target_num_tiles = 1 , zoom = None , max_tile_recursion = 6 , as_gdf = True
2640
2729
):
2641
2730
import mercantile
@@ -2648,9 +2737,9 @@ def get_tile(
2648
2737
raise ValueError ("target_num_tiles should be more than zero." )
2649
2738
2650
2739
if target_num_tiles == 1 :
2651
- bounds = to_gdf (bounds )
2740
+ bounds = geo_convert (bounds )
2652
2741
tile = mercantile .bounding_tile (* bounds .total_bounds )
2653
- gdf = to_gdf ((tile .x , tile .y , tile .z ))
2742
+ gdf = geo_convert ((tile .x , tile .y , tile .z ))
2654
2743
else :
2655
2744
zoom_level = (
2656
2745
zoom
@@ -2674,3 +2763,93 @@ def get_tile(
2674
2763
2675
2764
return gdf if as_gdf else gdf [["x" , "y" , "z" ]].values
2676
2765
2766
+ def get_utm_epsg (geometry ):
2767
+ utm_zone = int ((geometry .centroid .x + 180 ) / 6 ) + 1
2768
+ return 32600 + utm_zone if geometry .centroid .y >= 0 else 32700 + utm_zone # 326XX for Northern Hemisphere, 327XX for Southern
2769
+
2770
+
2771
+ def add_utm_area (gdf , utm_col = 'utm_epsg' , utm_area_col = 'utm_area_sqm' ):
2772
+ import geopandas as gpd
2773
+
2774
+ # Step 1: Compute UTM zones
2775
+ gdf [utm_col ] = gdf .geometry .apply (get_utm_epsg )
2776
+
2777
+ # Step 2: Compute areas in batches while preserving order
2778
+ areas_dict = {}
2779
+
2780
+ for utm_zone , group in gdf .groupby (utm_col , group_keys = False ):
2781
+ utm_crs = f"EPSG:{ utm_zone } "
2782
+ reprojected = group .to_crs (utm_crs ) # Reproject all geometries in batch
2783
+ areas_dict .update (dict (zip (group .index , reprojected .area ))) # Store areas by index
2784
+
2785
+ # Step 3: Assign areas back to original gdf order
2786
+ gdf [utm_area_col ] = gdf .index .map (areas_dict )
2787
+ return gdf
2788
+
2789
+
2790
+ def run_submit_with_defaults (udf_token : str , cache_length : str = "9999d" , default_params_token : Optional [str ] = None ):
2791
+ """
2792
+ Uses fused.submit() to run a UDF over:
2793
+ - A UDF that returns a pd.DataFrame of test arguments (`default_params_token`)
2794
+ - Or default params (expectes udf.utils.submit_default_params to return a pd.DataFrame)
2795
+ """
2796
+
2797
+ # Assume people know what they're doing
2798
+ try :
2799
+ # arg_token is a UDF that returns a pd.DataFrame of test arguments
2800
+ arg_list = fused .run (default_params_token )
2801
+
2802
+ if 'bounds' in arg_list .columns :
2803
+ # This is a hacky workaround for now as we can't pass np.float bounds to `fused.run(udf, bounds) so need to convert them to float
2804
+ # but fused.run() returns bounds as `np.float` for whatever reason
2805
+ arg_list ['bounds' ] = arg_list ['bounds' ].apply (lambda bounds_list : [float (x ) for x in bounds_list ])
2806
+
2807
+ print (f"Loaded default params from UDF { default_params_token } ... Running UDF over these" )
2808
+ except Exception as e :
2809
+ print (f"Couldn't load UDF { udf_token } with arg_token { default_params_token } , trying to load default params..." )
2810
+
2811
+ try :
2812
+ udf = fused .load (udf_token )
2813
+
2814
+ # Assume we have a funciton called 'submit_default_params` inside the main UDF which returns a pd.DataFrame of test arguments
2815
+ # TODO: edit this to directly use `udf.submit_default_params()` once we remove utils
2816
+ if hasattr (udf .utils , "submit_default_params" ):
2817
+ print ("Found default params for UDF, using them..." )
2818
+ arg_list = udf .utils .submit_default_params ()
2819
+ else :
2820
+ raise ValueError ("No default params found for UDF, can't run this UDF" )
2821
+
2822
+ except Exception as e :
2823
+ raise ValueError ("Couldn't load UDF, can't run this UDF. Try with another UDF" )
2824
+
2825
+ #TODO: Add support for using the default view state
2826
+
2827
+ return fused .submit (
2828
+ udf_token ,
2829
+ arg_list ,
2830
+ cache_max_age = cache_length ,
2831
+ wait_on_results = True ,
2832
+ )
2833
+
2834
+ def test_udf (udf_token : str , cache_length : str = "9999d" , arg_token : Optional [str ] = None ):
2835
+ """
2836
+ Testing a UDF:
2837
+ 1. Does it run and return successful result for all its default parameters?
2838
+ 2. Are the results identical to the cached results?
2839
+
2840
+ Returns:
2841
+ - all_passing: True if the UDF runs and returns successful result for all its default parameters
2842
+ - all_equal: True if the results are identical to the cached results
2843
+ - prev_run: Cached UDF output
2844
+ - current_run: New UDF output
2845
+ """
2846
+ import pickle
2847
+
2848
+ cached_run = run_submit_with_defaults (udf_token , cache_length , arg_token )
2849
+ current_run = run_submit_with_defaults (udf_token , "0s" , arg_token )
2850
+
2851
+ # Check if results are valid
2852
+ all_passing = (current_run ["status" ] == "success" ).all ()
2853
+ # Check if result matches cached result
2854
+ all_equal = pickle .dumps (cached_run ) == pickle .dumps (current_run )
2855
+ return (bool (all_passing ), all_equal , cached_run , current_run )
0 commit comments