24
24
import com .google .errorprone .annotations .FormatString ;
25
25
import java .io .IOException ;
26
26
import java .util .Collections ;
27
+ import java .util .HashMap ;
27
28
import java .util .List ;
28
29
import java .util .Map ;
29
30
import java .util .function .Consumer ;
37
38
import org .apache .gravitino .Catalog ;
38
39
import org .apache .gravitino .client .GravitinoMetalake ;
39
40
import org .apache .gravitino .flink .connector .PropertiesConverter ;
41
+ import org .apache .gravitino .flink .connector .iceberg .IcebergPropertiesConstants ;
40
42
import org .apache .gravitino .flink .connector .integration .test .utils .TestUtils ;
41
43
import org .apache .gravitino .flink .connector .store .GravitinoCatalogStoreFactoryOptions ;
42
44
import org .apache .gravitino .integration .test .container .ContainerSuite ;
43
45
import org .apache .gravitino .integration .test .container .HiveContainer ;
44
46
import org .apache .gravitino .integration .test .util .BaseIT ;
47
+ import org .apache .gravitino .server .web .JettyServerConfig ;
45
48
import org .apache .hadoop .fs .FileSystem ;
46
49
import org .junit .jupiter .api .AfterAll ;
47
50
import org .junit .jupiter .api .BeforeAll ;
51
54
public abstract class FlinkEnvIT extends BaseIT {
52
55
private static final Logger LOG = LoggerFactory .getLogger (FlinkEnvIT .class );
53
56
private static final ContainerSuite CONTAINER_SUITE = ContainerSuite .getInstance ();
57
+
58
+ protected static final String icebergRestServiceName = "iceberg-rest" ;
59
+
54
60
protected static final String GRAVITINO_METALAKE = "flink" ;
55
61
protected static final String DEFAULT_CATALOG = "default_catalog" ;
56
62
@@ -65,32 +71,68 @@ public abstract class FlinkEnvIT extends BaseIT {
65
71
66
72
private static String gravitinoUri = "http://127.0.0.1:8090" ;
67
73
74
+ private final String lakeHouseIcebergProvider = "lakehouse-iceberg" ;
75
+
76
+ protected String icebergRestServiceUri ;
77
+
68
78
@ BeforeAll
69
- void startUp () {
79
+ void startUp () throws Exception {
80
+ initHiveEnv ();
81
+ if (lakeHouseIcebergProvider .equalsIgnoreCase (getProvider ())) {
82
+ initIcebergRestServiceEnv ();
83
+ }
70
84
// Start Gravitino server
85
+ super .startIntegrationTest ();
71
86
initGravitinoEnv ();
72
87
initMetalake ();
73
- initHiveEnv ();
74
88
initHdfsEnv ();
75
89
initFlinkEnv ();
76
90
LOG .info ("Startup Flink env successfully, Gravitino uri: {}." , gravitinoUri );
77
91
}
78
92
79
93
@ AfterAll
80
- static void stop () {
94
+ void stop () throws IOException , InterruptedException {
81
95
stopFlinkEnv ();
82
96
stopHdfsEnv ();
97
+ super .stopIntegrationTest ();
83
98
LOG .info ("Stop Flink env successfully." );
84
99
}
85
100
86
101
protected String flinkByPass (String key ) {
87
102
return PropertiesConverter .FLINK_PROPERTY_PREFIX + key ;
88
103
}
89
104
105
+ private void initIcebergRestServiceEnv () {
106
+ ignoreIcebergRestService = false ;
107
+ Map <String , String > icebergRestServiceConfigs = new HashMap <>();
108
+ icebergRestServiceConfigs .put (
109
+ "gravitino."
110
+ + icebergRestServiceName
111
+ + "."
112
+ + IcebergPropertiesConstants .GRAVITINO_ICEBERG_CATALOG_BACKEND ,
113
+ IcebergPropertiesConstants .ICEBERG_CATALOG_BACKEND_HIVE );
114
+ icebergRestServiceConfigs .put (
115
+ "gravitino."
116
+ + icebergRestServiceName
117
+ + "."
118
+ + IcebergPropertiesConstants .GRAVITINO_ICEBERG_CATALOG_URI ,
119
+ hiveMetastoreUri );
120
+ icebergRestServiceConfigs .put (
121
+ "gravitino."
122
+ + icebergRestServiceName
123
+ + "."
124
+ + IcebergPropertiesConstants .GRAVITINO_ICEBERG_CATALOG_WAREHOUSE ,
125
+ warehouse );
126
+ registerCustomConfigs (icebergRestServiceConfigs );
127
+ }
128
+
90
129
private void initGravitinoEnv () {
91
130
// Gravitino server is already started by AbstractIT, just construct gravitinoUrl
92
131
int gravitinoPort = getGravitinoServerPort ();
93
132
gravitinoUri = String .format ("http://127.0.0.1:%d" , gravitinoPort );
133
+ if (lakeHouseIcebergProvider .equalsIgnoreCase (getProvider ())) {
134
+ this .icebergRestServiceUri = getIcebergRestServiceUri ();
135
+ }
94
136
}
95
137
96
138
private void initMetalake () {
@@ -212,4 +254,14 @@ protected static void clearTableInSchema() {
212
254
TestUtils .assertTableResult (deleteResult , ResultKind .SUCCESS );
213
255
}
214
256
}
257
+
258
+ private String getIcebergRestServiceUri () {
259
+ JettyServerConfig jettyServerConfig =
260
+ JettyServerConfig .fromConfig (
261
+ serverConfig , String .format ("gravitino.%s." , icebergRestServiceName ));
262
+ return String .format (
263
+ "http://%s:%d/iceberg/" , jettyServerConfig .getHost (), jettyServerConfig .getHttpPort ());
264
+ }
265
+
266
+ protected abstract String getProvider ();
215
267
}
0 commit comments