From 2eab7f6e2bc3e8b13f58d443f1317cf41d4f8af4 Mon Sep 17 00:00:00 2001 From: Keyur Shah Date: Fri, 10 Jan 2025 12:20:13 -0800 Subject: [PATCH] Encapsulate contained in, entity vars, and obs by entity type in sql client (#1497) --- internal/server/placein/placein.go | 51 ++----- .../statvar/fetcher/entity_sv_fetcher.go | 27 +--- .../server/v2/observation/contained_in.go | 30 +---- .../golden/contained_in_2015/companies.json | 46 +++++++ .../golden/contained_in_2015_test.go | 6 + .../golden/contained_in_all/companies.json | 70 ++++++++++ .../golden/contained_in_all_test.go | 5 + .../v2/observation/golden/variable/empty.json | 1 + .../v2/observation/golden/variable_test.go | 4 + internal/server/v2/observation/helper.go | 18 +-- internal/sqldb/model.go | 12 ++ internal/sqldb/query.go | 124 ++++++++++++++++++ internal/sqldb/statements.go | 55 ++++++++ test/datacommons.db | Bin 32768 -> 32768 bytes test/observations.csv | 2 + 15 files changed, 354 insertions(+), 97 deletions(-) create mode 100644 internal/server/v2/observation/golden/contained_in_2015/companies.json create mode 100644 internal/server/v2/observation/golden/contained_in_all/companies.json create mode 100644 internal/server/v2/observation/golden/variable/empty.json diff --git a/internal/server/placein/placein.go b/internal/server/placein/placein.go index a9e9aeb22..c6fc2b0c7 100644 --- a/internal/server/placein/placein.go +++ b/internal/server/placein/placein.go @@ -16,8 +16,8 @@ package placein import ( "context" - "fmt" + "github.com/datacommonsorg/mixer/internal/sqldb" "github.com/datacommonsorg/mixer/internal/store/bigtable" "google.golang.org/protobuf/proto" @@ -76,54 +76,29 @@ func GetPlacesIn( } } } - if store.SQLClient.DB != nil { - var query string - var args []string + if sqldb.IsConnected(&store.SQLClient) { + var rows []*sqldb.SubjectObject + var err error + if len(parentPlaces) == 1 && parentPlaces[0] == childPlaceType { // When ancestor == child (typically requested for non-place entities), get all entities of that type. - query = - ` - SELECT subject_id, object_id - FROM triples - WHERE predicate = 'typeOf' - AND object_id = ?; - ` - args = []string{childPlaceType} + rows, err = store.SQLClient.GetAllEntitiesOfType(ctx, childPlaceType) + if err != nil { + return nil, err + } } else { // Only queries based on direct containedInPlace for now. // This could extend to more hops and even link with BT cache data, but that // might make it too complicated. // In custom DC, it's reasonable to ask user to provide direct containment // relation. - query = fmt.Sprintf( - ` - SELECT t1.subject_id, t2.object_id - FROM triples t1 - JOIN triples t2 - ON t1.subject_id = t2.subject_id - WHERE t1.predicate = 'typeOf' - AND t1.object_id = ? - AND t2.predicate = 'containedInPlace' - AND t2.object_id IN (%s); - `, - util.SQLInParam(len(parentPlaces)), - ) - args = []string{childPlaceType} - args = append(args, parentPlaces...) - } - - // Execute query - rows, err := store.SQLClient.DB.Query(query, util.ConvertArgs(args)...) - if err != nil { - return nil, err - } - defer rows.Close() - for rows.Next() { - var child, parent string - err = rows.Scan(&child, &parent) + rows, err = store.SQLClient.GetContainedInPlace(ctx, childPlaceType, parentPlaces) if err != nil { return nil, err } + } + for _, row := range rows { + child, parent := row.SubjectID, row.ObjectID result[parent] = append(result[parent], child) } } diff --git a/internal/server/statvar/fetcher/entity_sv_fetcher.go b/internal/server/statvar/fetcher/entity_sv_fetcher.go index 6bd5871f0..68641253b 100644 --- a/internal/server/statvar/fetcher/entity_sv_fetcher.go +++ b/internal/server/statvar/fetcher/entity_sv_fetcher.go @@ -16,10 +16,9 @@ package fetcher import ( "context" - "fmt" - "strings" pb "github.com/datacommonsorg/mixer/internal/proto" + "github.com/datacommonsorg/mixer/internal/sqldb" "github.com/datacommonsorg/mixer/internal/store" "github.com/datacommonsorg/mixer/internal/store/bigtable" "github.com/datacommonsorg/mixer/internal/util" @@ -72,29 +71,13 @@ func FetchEntityVariables( } } // Fetch from SQL database - if store.SQLClient.DB != nil { - query := fmt.Sprintf( - ` - SELECT entity, GROUP_CONCAT(DISTINCT variable) AS variables - FROM observations WHERE entity in (%s) - GROUP BY entity; - `, - util.SQLInParam(len(entities)), - ) - // Execute query - rows, err := store.SQLClient.DB.Query(query, util.ConvertArgs(entities)...) + if sqldb.IsConnected(&store.SQLClient) { + rows, err := store.SQLClient.GetEntityVariables(ctx, entities) if err != nil { return nil, err } - defer rows.Close() - for rows.Next() { - var entity, variableStr string - err = rows.Scan(&entity, &variableStr) - if err != nil { - return nil, err - } - variables := strings.Split(variableStr, ",") - resp[entity].StatVars = util.MergeDedupe(resp[entity].StatVars, variables) + for _, row := range rows { + resp[row.Entity].StatVars = util.MergeDedupe(resp[row.Entity].StatVars, row.Variables) } } return resp, nil diff --git a/internal/server/v2/observation/contained_in.go b/internal/server/v2/observation/contained_in.go index 190688499..6626f0574 100644 --- a/internal/server/v2/observation/contained_in.go +++ b/internal/server/v2/observation/contained_in.go @@ -17,11 +17,9 @@ package observation import ( "context" - "fmt" "log" "net/http" "sort" - "strings" "github.com/datacommonsorg/mixer/internal/merger" pb "github.com/datacommonsorg/mixer/internal/proto" @@ -30,6 +28,7 @@ import ( "github.com/datacommonsorg/mixer/internal/server/resource" "github.com/datacommonsorg/mixer/internal/server/stat" "github.com/datacommonsorg/mixer/internal/server/v2/shared" + "github.com/datacommonsorg/mixer/internal/sqldb" "github.com/datacommonsorg/mixer/internal/sqldb/sqlquery" "github.com/datacommonsorg/mixer/internal/store/bigtable" "github.com/datacommonsorg/mixer/internal/util" @@ -183,35 +182,14 @@ func FetchContainedIn( // Fetch Data from SQLite database. var sqlResult *pbv2.ObservationResponse - if store.SQLClient.DB != nil { + if sqldb.IsConnected(&store.SQLClient) { if ancestor == childType { sqlResult = initObservationResult(variables) - variablesStr := "'" + strings.Join(variables, "', '") + "'" - query := fmt.Sprintf( - ` - SELECT entity, variable, date, value, provenance FROM observations as o - JOIN triples as t ON o.entity = t.subject_id - AND t.predicate = 'typeOf' - AND t.object_id = '%s' - AND o.value != '' - AND o.variable IN (%s) - `, - childType, - variablesStr, - ) - if queryDate != "" && queryDate != shared.LATEST { - query += fmt.Sprintf("AND date = (%s) ", queryDate) - } - query += "ORDER BY date ASC;" - rows, err := store.SQLClient.DB.Query(query) - if err != nil { - return nil, err - } - defer rows.Close() - tmp, err := handleSQLRows(rows, variables) + rows, err := store.SQLClient.GetObservationsByEntityType(ctx, variables, childType, queryDate) if err != nil { return nil, err } + tmp := handleSQLRows(rows, variables) sqlResult = processSqlData(sqlResult, tmp, queryDate, sqlProvenances) } else { if len(childPlaces) == 0 { diff --git a/internal/server/v2/observation/golden/contained_in_2015/companies.json b/internal/server/v2/observation/golden/contained_in_2015/companies.json new file mode 100644 index 000000000..7ad45de8c --- /dev/null +++ b/internal/server/v2/observation/golden/contained_in_2015/companies.json @@ -0,0 +1,46 @@ +{ + "by_variable": { + "test_var_3": { + "by_entity": { + "ein/1": { + "ordered_facets": [ + { + "facet_id": "custom", + "observations": [ + { + "date": "2015", + "value": 1 + } + ], + "obs_count": 1, + "earliest_date": "2015", + "latest_date": "2015" + } + ] + }, + "ein/2": { + "ordered_facets": [ + { + "facet_id": "custom", + "observations": [ + { + "date": "2015", + "value": 2 + } + ], + "obs_count": 1, + "earliest_date": "2015", + "latest_date": "2015" + } + ] + } + } + } + }, + "facets": { + "custom": { + "import_name": "Custom Prov", + "provenance_url": "custom.datacommons.org" + } + } +} \ No newline at end of file diff --git a/internal/server/v2/observation/golden/contained_in_2015_test.go b/internal/server/v2/observation/golden/contained_in_2015_test.go index c9dd82310..c97642ae8 100644 --- a/internal/server/v2/observation/golden/contained_in_2015_test.go +++ b/internal/server/v2/observation/golden/contained_in_2015_test.go @@ -87,6 +87,12 @@ func TestFetchContainIn(t *testing.T) { &pbv2.FacetFilter{FacetIds: []string{"2176550201", "10983471"}}, "US_State_Multi_Facet_Id.json", }, + { + []string{"test_var_3"}, + "Company<-containedInPlace+{typeOf:Company}", + nil, + "companies.json", + }, } { goldenFile := c.goldenFile resp, err := mixer.V2Observation(ctx, &pbv2.ObservationRequest{ diff --git a/internal/server/v2/observation/golden/contained_in_all/companies.json b/internal/server/v2/observation/golden/contained_in_all/companies.json new file mode 100644 index 000000000..6b24f1ca4 --- /dev/null +++ b/internal/server/v2/observation/golden/contained_in_all/companies.json @@ -0,0 +1,70 @@ +{ + "by_variable": { + "test_var_3": { + "by_entity": { + "ein/1": { + "ordered_facets": [ + { + "facet_id": "custom", + "observations": [ + { + "date": "2015", + "value": 1 + }, + { + "date": "2023", + "value": 100 + } + ], + "obs_count": 2, + "earliest_date": "2015", + "latest_date": "2023" + } + ] + }, + "ein/2": { + "ordered_facets": [ + { + "facet_id": "custom", + "observations": [ + { + "date": "2015", + "value": 2 + }, + { + "date": "2023", + "value": 200 + } + ], + "obs_count": 2, + "earliest_date": "2015", + "latest_date": "2023" + } + ] + }, + "ein/3": { + "ordered_facets": [ + { + "facet_id": "custom", + "observations": [ + { + "date": "2023", + "value": 300 + } + ], + "obs_count": 1, + "earliest_date": "2023", + "latest_date": "2023" + } + ] + } + } + } + }, + "facets": { + "custom": { + "import_name": "Custom Prov", + "provenance_url": "custom.datacommons.org" + } + } +} \ No newline at end of file diff --git a/internal/server/v2/observation/golden/contained_in_all_test.go b/internal/server/v2/observation/golden/contained_in_all_test.go index 862a19a77..b1dcc31e5 100644 --- a/internal/server/v2/observation/golden/contained_in_all_test.go +++ b/internal/server/v2/observation/golden/contained_in_all_test.go @@ -72,6 +72,11 @@ func TestFetchContainInAll(t *testing.T) { "country/FRA<-containedInPlace+{typeOf:AdministrativeArea2}", "FRA_AA2.json", }, + { + []string{"test_var_3"}, + "Company<-containedInPlace+{typeOf:Company}", + "companies.json", + }, } { goldenFile := c.goldenFile resp, err := mixer.V2Observation(ctx, &pbv2.ObservationRequest{ diff --git a/internal/server/v2/observation/golden/variable/empty.json b/internal/server/v2/observation/golden/variable/empty.json new file mode 100644 index 000000000..9e26dfeeb --- /dev/null +++ b/internal/server/v2/observation/golden/variable/empty.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/internal/server/v2/observation/golden/variable_test.go b/internal/server/v2/observation/golden/variable_test.go index f0328d934..11f9ca9dc 100644 --- a/internal/server/v2/observation/golden/variable_test.go +++ b/internal/server/v2/observation/golden/variable_test.go @@ -43,6 +43,10 @@ func TestVariable(t *testing.T) { []string{"wikidataId/Q506877", "wikidataId/Q1951", "fake_place", "test_entity"}, "result.json", }, + { + []string{}, + "empty.json", + }, } { goldenFile := c.goldenFile resp, err := mixer.V2Observation(ctx, &pbv2.ObservationRequest{ diff --git a/internal/server/v2/observation/helper.go b/internal/server/v2/observation/helper.go index e4fdf0910..6bea4693c 100644 --- a/internal/server/v2/observation/helper.go +++ b/internal/server/v2/observation/helper.go @@ -16,11 +16,10 @@ package observation import ( - "database/sql" - pb "github.com/datacommonsorg/mixer/internal/proto" pbv2 "github.com/datacommonsorg/mixer/internal/proto/v2" "github.com/datacommonsorg/mixer/internal/server/v2/shared" + "github.com/datacommonsorg/mixer/internal/sqldb" "google.golang.org/protobuf/proto" ) @@ -38,20 +37,17 @@ func initObservationResult(variables []string) *pbv2.ObservationResponse { } func handleSQLRows( - rows *sql.Rows, + rows []*sqldb.Observation, variables []string, -) (map[string]map[string]map[string][]*pb.PointStat, error) { +) map[string]map[string]map[string][]*pb.PointStat { // result is keyed by variable, entity and provID result := make(map[string]map[string]map[string][]*pb.PointStat) for _, variable := range variables { result[variable] = make(map[string]map[string][]*pb.PointStat) } - for rows.Next() { - var entity, variable, date, prov string - var value float64 - if err := rows.Scan(&entity, &variable, &date, &value, &prov); err != nil { - return nil, err - } + for _, row := range rows { + entity, variable, date, prov := row.Entity, row.Variable, row.Date, row.Provenance + value := row.Value if result[variable][entity] == nil { result[variable][entity] = map[string][]*pb.PointStat{} } @@ -66,7 +62,7 @@ func handleSQLRows( }, ) } - return result, rows.Err() + return result } func processSqlData( diff --git a/internal/sqldb/model.go b/internal/sqldb/model.go index 0b85e9223..56cc2b9c7 100644 --- a/internal/sqldb/model.go +++ b/internal/sqldb/model.go @@ -98,3 +98,15 @@ type NodePredicate struct { Node string `db:"node"` Predicate string `db:"predicate"` } + +// SubjectObject represents a row for (subject_id, object_id) pairs. +type SubjectObject struct { + SubjectID string `db:"subject_id"` + ObjectID string `db:"object_id"` +} + +// EntityVariables represents a row that includes an entity and its variables. +type EntityVariables struct { + Entity string `db:"entity"` + Variables StringSlice `db:"variables"` +} diff --git a/internal/sqldb/query.go b/internal/sqldb/query.go index 1d44d1450..a7848f875 100644 --- a/internal/sqldb/query.go +++ b/internal/sqldb/query.go @@ -74,6 +74,49 @@ func (sc *SQLClient) GetObservations(ctx context.Context, variables []string, en return observations, nil } +// GetObservationsByEntityType retrieves observations from SQL given a list of variables and an entity type and a date. +func (sc *SQLClient) GetObservationsByEntityType(ctx context.Context, variables []string, entityType string, date string) ([]*Observation, error) { + defer util.TimeTrack(time.Now(), "SQL: GetObservationsByEntityType") + + var observations []*Observation + if len(variables) == 0 { + return observations, nil + } + + var stmt statement + + switch { + case date != "" && date != latestDate: + stmt = statement{ + query: statements.getObsByVariableEntityTypeAndDate, + args: map[string]interface{}{ + "variables": variables, + "entityType": entityType, + "date": date, + }, + } + default: + stmt = statement{ + query: statements.getObsByVariableAndEntityType, + args: map[string]interface{}{ + "variables": variables, + "entityType": entityType, + }, + } + } + + err := sc.queryAndCollect( + ctx, + stmt, + &observations, + ) + if err != nil { + return nil, err + } + + return observations, nil +} + // GetSVSummaries retrieves summaries for the specified variables. func (sc *SQLClient) GetSVSummaries(ctx context.Context, variables []string) ([]*SVSummary, error) { defer util.TimeTrack(time.Now(), "SQL: GetSVSummaries") @@ -241,6 +284,87 @@ func (sc *SQLClient) GetExistingStatVarGroups(ctx context.Context, groupDcids [] return values, nil } +// GetAllEntitiesOfType returns all entities of the specified type. +func (sc *SQLClient) GetAllEntitiesOfType(ctx context.Context, typeOf string) ([]*SubjectObject, error) { + defer util.TimeTrack(time.Now(), "SQL: GetAllEntitiesOfType") + + rows := []*SubjectObject{} + + stmt := statement{ + query: statements.getAllEntitiesOfType, + args: map[string]interface{}{ + "type": typeOf, + }, + } + + err := sc.queryAndCollect( + ctx, + stmt, + &rows, + ) + if err != nil { + return nil, err + } + return rows, nil +} + +// GetContainedInPlace returns all entities of the specified childPlaceType that are contained in the specified parentPlaces. +func (sc *SQLClient) GetContainedInPlace(ctx context.Context, childPlaceType string, parentPlaces []string) ([]*SubjectObject, error) { + defer util.TimeTrack(time.Now(), "SQL: GetContainedInPlace") + + rows := []*SubjectObject{} + + if len(parentPlaces) == 0 { + return rows, nil + } + + stmt := statement{ + query: statements.getContainedInPlace, + args: map[string]interface{}{ + "childPlaceType": childPlaceType, + "parentPlaces": parentPlaces, + }, + } + + err := sc.queryAndCollect( + ctx, + stmt, + &rows, + ) + if err != nil { + return nil, err + } + return rows, nil +} + +// GetEntityVariables returns variables associated with the specified entities. +func (sc *SQLClient) GetEntityVariables(ctx context.Context, entities []string) ([]*EntityVariables, error) { + defer util.TimeTrack(time.Now(), "SQL: GetEntityVariables") + + rows := []*EntityVariables{} + + if len(entities) == 0 { + return rows, nil + } + + stmt := statement{ + query: statements.getEntityVariables, + args: map[string]interface{}{ + "entities": entities, + }, + } + + err := sc.queryAndCollect( + ctx, + stmt, + &rows, + ) + if err != nil { + return nil, err + } + return rows, nil +} + // GetKeyValue gets the value for the specified key from the key_value_store table. // If not found, returns false. // If found, unmarshals the value into the specified proto and returns true. diff --git a/internal/sqldb/statements.go b/internal/sqldb/statements.go index 7d2e8585c..d860b3abe 100644 --- a/internal/sqldb/statements.go +++ b/internal/sqldb/statements.go @@ -19,6 +19,8 @@ package sqldb var statements = struct { getObsByVariableAndEntity string getObsByVariableEntityAndDate string + getObsByVariableAndEntityType string + getObsByVariableEntityTypeAndDate string getStatVarSummaries string getKeyValue string getAllStatVarGroups string @@ -27,6 +29,9 @@ var statements = struct { getSubjectPredicates string getObjectPredicates string getExistingStatVarGroups string + getAllEntitiesOfType string + getContainedInPlace string + getEntityVariables string }{ getObsByVariableAndEntity: ` SELECT entity, variable, date, value, provenance, unit, scaling_factor, measurement_method, observation_period, properties @@ -47,6 +52,31 @@ var statements = struct { AND date = :date ORDER BY date ASC; `, + getObsByVariableAndEntityType: ` + SELECT entity, variable, date, value, provenance, unit, scaling_factor, measurement_method, observation_period, properties + FROM observations AS o + JOIN ( + SELECT DISTINCT subject_id + FROM triples + WHERE predicate = 'typeOf' AND object_id = :entityType + ) AS t ON o.entity = t.subject_id + WHERE o.value != '' + AND o.variable IN (:variables) + ORDER BY date ASC; + `, + getObsByVariableEntityTypeAndDate: ` + SELECT entity, variable, date, value, provenance, unit, scaling_factor, measurement_method, observation_period, properties + FROM observations AS o + JOIN ( + SELECT DISTINCT subject_id + FROM triples + WHERE predicate = 'typeOf' AND object_id = :entityType + ) AS t ON o.entity = t.subject_id + WHERE o.value != '' + AND o.variable IN (:variables) + AND o.date = :date + ORDER BY date ASC; + `, getStatVarSummaries: ` WITH entity_types AS (SELECT @@ -168,4 +198,29 @@ var statements = struct { AND subject_id IN (:groups) AND object_id = 'StatVarGroup'; `, + getAllEntitiesOfType: ` + SELECT subject_id, object_id + FROM triples + WHERE + predicate = 'typeOf' + AND object_id = :type; + `, + getContainedInPlace: ` + SELECT t1.subject_id, t2.object_id + FROM + triples t1 + JOIN triples t2 + ON t1.subject_id = t2.subject_id + WHERE + t1.predicate = 'typeOf' + AND t1.object_id = :childPlaceType + AND t2.predicate = 'containedInPlace' + AND t2.object_id IN (:parentPlaces); + `, + getEntityVariables: ` + SELECT entity, GROUP_CONCAT(DISTINCT variable) variables + FROM observations + WHERE entity in (:entities) + GROUP BY entity; + `, } diff --git a/test/datacommons.db b/test/datacommons.db index 8c63afead8ed6cb87b0e8560640bfadd2bbe3a20..a0b875a7f68a2cb8fe0d03b7a8e66299ca59a962 100644 GIT binary patch delta 181 zcmZo@U}|V!njkHBkb!}L1Bh9Hm<5QNC+Zk89^9C)gr7N>@zP{=ff@YrOh1@TGc97u zW3pmmW4yGnQH7DoD0p(0kj7*$A#TQ?$@_q;Iw9`Kib9$^M#-hcCHc9b!7fGtKnY_; z|H+?(G#LFR3y8{0HW%jCQ}yK*R}|%!lYxPO1Bh9Hm<5PyC+Zk8?%bHLgr7N(NntX(zzl9lrVmVem^zq3 zm=rcPmN8DY64ILdN057RzL4f*10ilk|H(E&#*BWG3xK5W