5
5
import pandas as pd
6
6
import numpy as np
7
7
from numpy .typing import NDArray
8
- from typing import Dict , List , Literal , Optional , Sequence , Tuple , Union , Any
8
+ from typing import Dict , List , Literal , Optional , Sequence , Tuple , Union
9
9
from loguru import logger
10
10
11
- def to_pickle (obj ):
12
- """Encode an object to a pickle byte stream and store in DataFrame."""
13
- import pickle
14
- import pandas as pd
15
- return pd .DataFrame (
16
- {"data_type" : [type (obj ).__name__ ], "data_content" : [pickle .dumps (obj )]}
17
- )
18
-
19
- def from_pickle (df ):
20
- """Decode an object from a DataFrame containing pickle byte stream."""
21
- import pickle
22
- return pickle .loads (df ["data_content" ].iloc [0 ])
23
-
24
- def df_summary (df , description = "" , n_head = 5 , n_tail = 5 , n_sample = 5 , n_unique = 100 , add_details = True ):
25
- val = description + "\n \n "
26
- val += "These are stats for df (pd.DataFrame):\n "
27
- val += f"{ list (df .columns )= } \n \n "
28
- val += f"{ df .isnull ().sum ()= } \n \n "
29
- val += f"{ df .describe ().to_json ()= } \n \n "
30
- val += f"{ df .head (n_head ).to_json ()= } \n \n "
31
- val += f"{ df .tail (n_tail ).to_json ()= } \n \n "
32
- if len (df ) > n_sample :
33
- val += f"{ df .sample (n_sample ).to_json ()= } \n \n "
34
- if add_details :
35
- if len (df ) <= n_unique :
36
- val += f"{ df .to_json ()} \n \n "
37
- else :
38
- for c in df .columns :
39
- value_counts = df [c ].value_counts ()
40
- df [c ].value_counts ().head ()
41
- val += f"df[{ c } ].value_counts()\n { value_counts } \n \n "
42
- val += f"{ df [c ].unique ()[:n_unique ]} \n \n "
43
- return val
44
-
45
- def get_diff_text (text1 : str , text2 : str , as_html : bool = True , only_diff : bool = False ) -> str :
46
- import difflib
47
- import html
48
-
49
- diff = difflib .ndiff (text1 .splitlines (keepends = True ), text2 .splitlines (keepends = True ))
50
- processed_diff = []
51
-
52
- if not as_html :
53
- for line in diff :
54
- if line .startswith ("+" ):
55
- processed_diff .append (f"ADD: { line } " ) # Additions
56
- elif line .startswith ("-" ):
57
- processed_diff .append (f"DEL: { line } " ) # Deletions
58
- else :
59
- if not only_diff :
60
- processed_diff .append (f" { line } " ) # Unchanged lines
61
- return "\n " .join (processed_diff )
62
-
63
- for line in diff :
64
- escaped_line = html .escape (line ) # Escape HTML to preserve special characters
65
-
66
- if line .startswith ("+" ):
67
- processed_diff .append (f"<span style='color:green; line-height:normal;'> { escaped_line } </span><br>" ) # Green for additions
68
- elif line .startswith ("-" ):
69
- processed_diff .append (f"<span style='color:red; line-height:normal;'> { escaped_line } </span><br>" ) # Red for deletions
70
- else :
71
- if not only_diff :
72
- processed_diff .append (f"<span style='color:gray; line-height:normal;'> { escaped_line } </span><br>" ) # Gray for unchanged lines
73
-
74
- # HTML structure with a dropdown for selecting background color
75
- html_output = """
76
- <div>
77
- <label for="backgroundColor" style="color:gray;">Choose Background Color: </label>
78
- <select id="backgroundColor" onchange="document.getElementById('diff-container').style.backgroundColor = this.value;">
79
- <option value="#111111">Dark Gray</option>
80
- <option value="#f0f0f0">Light Gray</option>
81
- <option value="#ffffff">White</option>
82
- <option value="#e0f7fa">Cyan</option>
83
- <option value="#ffebee">Pink</option>
84
- <option value="#c8e6c9">Green</option>
85
- </select>
86
- </div>
87
- <div id="diff-container" style="background-color:#111111; padding:10px; font-family:monospace; white-space:pre; line-height:normal;">
88
- {}</div>
89
- """ .format ("" .join (processed_diff ))
90
- return html_output
91
-
92
-
93
11
def json_path_from_secret (var = 'gcs_fused' ):
94
12
import json
95
13
import tempfile
@@ -1188,7 +1106,7 @@ def df_to_gdf(df, cols_lonlat=None, verbose=False):
1188
1106
return df
1189
1107
1190
1108
1191
- def geo_convert (
1109
+ def to_gdf (
1192
1110
data ,
1193
1111
crs = None ,
1194
1112
cols_lonlat = None ,
@@ -1201,13 +1119,6 @@ def geo_convert(
1201
1119
import pandas as pd
1202
1120
import mercantile
1203
1121
1204
- # Convert xyz dict to xyz array
1205
- if isinstance (data , dict ) and set (data .keys ()) == {'x' , 'y' , 'z' }:
1206
- try :
1207
- data = [int (data ['x' ]), int (data ['y' ]), int (data ['z' ])]
1208
- except (ValueError , TypeError ):
1209
- pass
1210
-
1211
1122
# Handle the bounds case specifically
1212
1123
if data is None or (isinstance (data , (list , tuple , np .ndarray )) and len (data ) == 4 ):
1213
1124
bounds = [- 180 , - 90 , 180 , 90 ] if data is None else data
@@ -1325,7 +1236,7 @@ def geo_buffer(
1325
1236
assert data .crs not in (
1326
1237
None ,
1327
1238
"" ,
1328
- ), "no crs was not found. use geo_convert to add crs"
1239
+ ), "no crs was not found. use to_gdf to add crs"
1329
1240
if str (dst_crs ).lower ().replace ("_" , "" ).replace (" " , "" ).replace ("-" , "" ) in [
1330
1241
"original" ,
1331
1242
"originalcrs" ,
@@ -1371,15 +1282,15 @@ def geo_bbox(
1371
1282
import pyproj
1372
1283
src_crs = data .crs
1373
1284
if not dst_crs :
1374
- return geo_convert (
1285
+ return to_gdf (
1375
1286
shapely .geometry .box (* data .total_bounds ), crs = src_crs , verbose = verbose
1376
1287
)
1377
1288
elif str (dst_crs ).lower () == "utm" :
1378
1289
dst_crs = data .estimate_utm_crs ()
1379
1290
logger .debug (f"estimated dst_crs={ crs_display (dst_crs )} " )
1380
1291
transformer = pyproj .Transformer .from_crs (src_crs , dst_crs , always_xy = True )
1381
1292
dst_bounds = transformer .transform_bounds (* data .total_bounds )
1382
- return geo_convert (
1293
+ return to_gdf (
1383
1294
shapely .geometry .box (* dst_bounds , ccw = True ), crs = dst_crs , verbose = verbose
1384
1295
)
1385
1296
@@ -1454,9 +1365,9 @@ def geo_join(
1454
1365
import geopandas as gpd
1455
1366
import shapely
1456
1367
if type (left ) != gpd .GeoDataFrame :
1457
- left = geo_convert (left , verbose = verbose )
1368
+ left = to_gdf (left , verbose = verbose )
1458
1369
if type (right ) != gpd .GeoDataFrame :
1459
- right = geo_convert (right , verbose = verbose )
1370
+ right = to_gdf (right , verbose = verbose )
1460
1371
left_geom_cols = get_geo_cols (left )
1461
1372
right_geom_cols = get_geo_cols (right )
1462
1373
if verbose :
@@ -1572,9 +1483,9 @@ def geo_distance(
1572
1483
import geopandas as gpd
1573
1484
import shapely
1574
1485
if type (left ) != gpd .GeoDataFrame :
1575
- left = geo_convert (left , verbose = verbose )
1486
+ left = to_gdf (left , verbose = verbose )
1576
1487
if type (right ) != gpd .GeoDataFrame :
1577
- right = geo_convert (right , verbose = verbose )
1488
+ right = to_gdf (right , verbose = verbose )
1578
1489
left_geom_cols = get_geo_cols (left )
1579
1490
right_geom_cols = get_geo_cols (right )
1580
1491
cols_right = list (cols_right )
@@ -1643,7 +1554,7 @@ def geo_samples(
1643
1554
(random .uniform (min_x , max_x ), random .uniform (min_y , max_y ))
1644
1555
for _ in range (n_samples )
1645
1556
]
1646
- return geo_convert (pd .DataFrame (points , columns = ["lng" , "lat" ]))[["geometry" ]]
1557
+ return to_gdf (pd .DataFrame (points , columns = ["lng" , "lat" ]))[["geometry" ]]
1647
1558
1648
1559
1649
1560
def bbox_stac_items (bounds , table ):
@@ -2078,7 +1989,7 @@ def mercantile_polyfill(geom, zooms=[15], compact=True, k=None):
2078
1989
import mercantile
2079
1990
import shapely
2080
1991
2081
- gdf = geo_convert (geom , crs = 4326 )
1992
+ gdf = to_gdf (geom , crs = 4326 )
2082
1993
geometry = gdf .geometry [0 ]
2083
1994
2084
1995
tile_list = list (mercantile .tiles (* geometry .bounds , zooms = zooms ))
@@ -2724,7 +2635,7 @@ def estimate_zoom(bounds, target_num_tiles=1):
2724
2635
return zoom + 1
2725
2636
2726
2637
2727
- def get_tiles (
2638
+ def get_tile (
2728
2639
bounds = None , target_num_tiles = 1 , zoom = None , max_tile_recursion = 6 , as_gdf = True
2729
2640
):
2730
2641
import mercantile
@@ -2737,9 +2648,9 @@ def get_tiles(
2737
2648
raise ValueError ("target_num_tiles should be more than zero." )
2738
2649
2739
2650
if target_num_tiles == 1 :
2740
- bounds = geo_convert (bounds )
2651
+ bounds = to_gdf (bounds )
2741
2652
tile = mercantile .bounding_tile (* bounds .total_bounds )
2742
- gdf = geo_convert ((tile .x , tile .y , tile .z ))
2653
+ gdf = to_gdf ((tile .x , tile .y , tile .z ))
2743
2654
else :
2744
2655
zoom_level = (
2745
2656
zoom
@@ -2763,93 +2674,3 @@ def get_tiles(
2763
2674
2764
2675
return gdf if as_gdf else gdf [["x" , "y" , "z" ]].values
2765
2676
2766
- def get_utm_epsg (geometry ):
2767
- utm_zone = int ((geometry .centroid .x + 180 ) / 6 ) + 1
2768
- return 32600 + utm_zone if geometry .centroid .y >= 0 else 32700 + utm_zone # 326XX for Northern Hemisphere, 327XX for Southern
2769
-
2770
-
2771
- def add_utm_area (gdf , utm_col = 'utm_epsg' , utm_area_col = 'utm_area_sqm' ):
2772
- import geopandas as gpd
2773
-
2774
- # Step 1: Compute UTM zones
2775
- gdf [utm_col ] = gdf .geometry .apply (get_utm_epsg )
2776
-
2777
- # Step 2: Compute areas in batches while preserving order
2778
- areas_dict = {}
2779
-
2780
- for utm_zone , group in gdf .groupby (utm_col , group_keys = False ):
2781
- utm_crs = f"EPSG:{ utm_zone } "
2782
- reprojected = group .to_crs (utm_crs ) # Reproject all geometries in batch
2783
- areas_dict .update (dict (zip (group .index , reprojected .area ))) # Store areas by index
2784
-
2785
- # Step 3: Assign areas back to original gdf order
2786
- gdf [utm_area_col ] = gdf .index .map (areas_dict )
2787
- return gdf
2788
-
2789
-
2790
- def run_submit_with_defaults (udf_token : str , cache_length : str = "9999d" , default_params_token : Optional [str ] = None ):
2791
- """
2792
- Uses fused.submit() to run a UDF over:
2793
- - A UDF that returns a pd.DataFrame of test arguments (`default_params_token`)
2794
- - Or default params (expectes udf.utils.submit_default_params to return a pd.DataFrame)
2795
- """
2796
-
2797
- # Assume people know what they're doing
2798
- try :
2799
- # arg_token is a UDF that returns a pd.DataFrame of test arguments
2800
- arg_list = fused .run (default_params_token )
2801
-
2802
- if 'bounds' in arg_list .columns :
2803
- # This is a hacky workaround for now as we can't pass np.float bounds to `fused.run(udf, bounds) so need to convert them to float
2804
- # but fused.run() returns bounds as `np.float` for whatever reason
2805
- arg_list ['bounds' ] = arg_list ['bounds' ].apply (lambda bounds_list : [float (x ) for x in bounds_list ])
2806
-
2807
- print (f"Loaded default params from UDF { default_params_token } ... Running UDF over these" )
2808
- except Exception as e :
2809
- print (f"Couldn't load UDF { udf_token } with arg_token { default_params_token } , trying to load default params..." )
2810
-
2811
- try :
2812
- udf = fused .load (udf_token )
2813
-
2814
- # Assume we have a funciton called 'submit_default_params` inside the main UDF which returns a pd.DataFrame of test arguments
2815
- # TODO: edit this to directly use `udf.submit_default_params()` once we remove utils
2816
- if hasattr (udf .utils , "submit_default_params" ):
2817
- print ("Found default params for UDF, using them..." )
2818
- arg_list = udf .utils .submit_default_params ()
2819
- else :
2820
- raise ValueError ("No default params found for UDF, can't run this UDF" )
2821
-
2822
- except Exception as e :
2823
- raise ValueError ("Couldn't load UDF, can't run this UDF. Try with another UDF" )
2824
-
2825
- #TODO: Add support for using the default view state
2826
-
2827
- return fused .submit (
2828
- udf_token ,
2829
- arg_list ,
2830
- cache_max_age = cache_length ,
2831
- wait_on_results = True ,
2832
- )
2833
-
2834
- def test_udf (udf_token : str , cache_length : str = "9999d" , arg_token : Optional [str ] = None ):
2835
- """
2836
- Testing a UDF:
2837
- 1. Does it run and return successful result for all its default parameters?
2838
- 2. Are the results identical to the cached results?
2839
-
2840
- Returns:
2841
- - all_passing: True if the UDF runs and returns successful result for all its default parameters
2842
- - all_equal: True if the results are identical to the cached results
2843
- - prev_run: Cached UDF output
2844
- - current_run: New UDF output
2845
- """
2846
- import pickle
2847
-
2848
- cached_run = run_submit_with_defaults (udf_token , cache_length , arg_token )
2849
- current_run = run_submit_with_defaults (udf_token , "0s" , arg_token )
2850
-
2851
- # Check if results are valid
2852
- all_passing = (current_run ["status" ] == "success" ).all ()
2853
- # Check if result matches cached result
2854
- all_equal = pickle .dumps (cached_run ) == pickle .dumps (current_run )
2855
- return (bool (all_passing ), all_equal , cached_run , current_run )
0 commit comments