Skip to content

Commit

Permalink
Encapsulate contained in, entity vars, and obs by entity type in sql …
Browse files Browse the repository at this point in the history
…client (#1497)
  • Loading branch information
keyurva authored Jan 10, 2025
1 parent 562cb07 commit 2eab7f6
Show file tree
Hide file tree
Showing 15 changed files with 354 additions and 97 deletions.
51 changes: 13 additions & 38 deletions internal/server/placein/placein.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ package placein

import (
"context"
"fmt"

"github.com/datacommonsorg/mixer/internal/sqldb"
"github.com/datacommonsorg/mixer/internal/store/bigtable"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -76,54 +76,29 @@ func GetPlacesIn(
}
}
}
if store.SQLClient.DB != nil {
var query string
var args []string
if sqldb.IsConnected(&store.SQLClient) {
var rows []*sqldb.SubjectObject
var err error

if len(parentPlaces) == 1 && parentPlaces[0] == childPlaceType {
// When ancestor == child (typically requested for non-place entities), get all entities of that type.
query =
`
SELECT subject_id, object_id
FROM triples
WHERE predicate = 'typeOf'
AND object_id = ?;
`
args = []string{childPlaceType}
rows, err = store.SQLClient.GetAllEntitiesOfType(ctx, childPlaceType)
if err != nil {
return nil, err
}
} else {
// Only queries based on direct containedInPlace for now.
// This could extend to more hops and even link with BT cache data, but that
// might make it too complicated.
// In custom DC, it's reasonable to ask user to provide direct containment
// relation.
query = fmt.Sprintf(
`
SELECT t1.subject_id, t2.object_id
FROM triples t1
JOIN triples t2
ON t1.subject_id = t2.subject_id
WHERE t1.predicate = 'typeOf'
AND t1.object_id = ?
AND t2.predicate = 'containedInPlace'
AND t2.object_id IN (%s);
`,
util.SQLInParam(len(parentPlaces)),
)
args = []string{childPlaceType}
args = append(args, parentPlaces...)
}

// Execute query
rows, err := store.SQLClient.DB.Query(query, util.ConvertArgs(args)...)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var child, parent string
err = rows.Scan(&child, &parent)
rows, err = store.SQLClient.GetContainedInPlace(ctx, childPlaceType, parentPlaces)
if err != nil {
return nil, err
}
}
for _, row := range rows {
child, parent := row.SubjectID, row.ObjectID
result[parent] = append(result[parent], child)
}
}
Expand Down
27 changes: 5 additions & 22 deletions internal/server/statvar/fetcher/entity_sv_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ package fetcher

import (
"context"
"fmt"
"strings"

pb "github.com/datacommonsorg/mixer/internal/proto"
"github.com/datacommonsorg/mixer/internal/sqldb"
"github.com/datacommonsorg/mixer/internal/store"
"github.com/datacommonsorg/mixer/internal/store/bigtable"
"github.com/datacommonsorg/mixer/internal/util"
Expand Down Expand Up @@ -72,29 +71,13 @@ func FetchEntityVariables(
}
}
// Fetch from SQL database
if store.SQLClient.DB != nil {
query := fmt.Sprintf(
`
SELECT entity, GROUP_CONCAT(DISTINCT variable) AS variables
FROM observations WHERE entity in (%s)
GROUP BY entity;
`,
util.SQLInParam(len(entities)),
)
// Execute query
rows, err := store.SQLClient.DB.Query(query, util.ConvertArgs(entities)...)
if sqldb.IsConnected(&store.SQLClient) {
rows, err := store.SQLClient.GetEntityVariables(ctx, entities)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var entity, variableStr string
err = rows.Scan(&entity, &variableStr)
if err != nil {
return nil, err
}
variables := strings.Split(variableStr, ",")
resp[entity].StatVars = util.MergeDedupe(resp[entity].StatVars, variables)
for _, row := range rows {
resp[row.Entity].StatVars = util.MergeDedupe(resp[row.Entity].StatVars, row.Variables)
}
}
return resp, nil
Expand Down
30 changes: 4 additions & 26 deletions internal/server/v2/observation/contained_in.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@ package observation

import (
"context"
"fmt"
"log"
"net/http"
"sort"
"strings"

"github.com/datacommonsorg/mixer/internal/merger"
pb "github.com/datacommonsorg/mixer/internal/proto"
Expand All @@ -30,6 +28,7 @@ import (
"github.com/datacommonsorg/mixer/internal/server/resource"
"github.com/datacommonsorg/mixer/internal/server/stat"
"github.com/datacommonsorg/mixer/internal/server/v2/shared"
"github.com/datacommonsorg/mixer/internal/sqldb"
"github.com/datacommonsorg/mixer/internal/sqldb/sqlquery"
"github.com/datacommonsorg/mixer/internal/store/bigtable"
"github.com/datacommonsorg/mixer/internal/util"
Expand Down Expand Up @@ -183,35 +182,14 @@ func FetchContainedIn(

// Fetch Data from SQLite database.
var sqlResult *pbv2.ObservationResponse
if store.SQLClient.DB != nil {
if sqldb.IsConnected(&store.SQLClient) {
if ancestor == childType {
sqlResult = initObservationResult(variables)
variablesStr := "'" + strings.Join(variables, "', '") + "'"
query := fmt.Sprintf(
`
SELECT entity, variable, date, value, provenance FROM observations as o
JOIN triples as t ON o.entity = t.subject_id
AND t.predicate = 'typeOf'
AND t.object_id = '%s'
AND o.value != ''
AND o.variable IN (%s)
`,
childType,
variablesStr,
)
if queryDate != "" && queryDate != shared.LATEST {
query += fmt.Sprintf("AND date = (%s) ", queryDate)
}
query += "ORDER BY date ASC;"
rows, err := store.SQLClient.DB.Query(query)
if err != nil {
return nil, err
}
defer rows.Close()
tmp, err := handleSQLRows(rows, variables)
rows, err := store.SQLClient.GetObservationsByEntityType(ctx, variables, childType, queryDate)
if err != nil {
return nil, err
}
tmp := handleSQLRows(rows, variables)
sqlResult = processSqlData(sqlResult, tmp, queryDate, sqlProvenances)
} else {
if len(childPlaces) == 0 {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"by_variable": {
"test_var_3": {
"by_entity": {
"ein/1": {
"ordered_facets": [
{
"facet_id": "custom",
"observations": [
{
"date": "2015",
"value": 1
}
],
"obs_count": 1,
"earliest_date": "2015",
"latest_date": "2015"
}
]
},
"ein/2": {
"ordered_facets": [
{
"facet_id": "custom",
"observations": [
{
"date": "2015",
"value": 2
}
],
"obs_count": 1,
"earliest_date": "2015",
"latest_date": "2015"
}
]
}
}
}
},
"facets": {
"custom": {
"import_name": "Custom Prov",
"provenance_url": "custom.datacommons.org"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ func TestFetchContainIn(t *testing.T) {
&pbv2.FacetFilter{FacetIds: []string{"2176550201", "10983471"}},
"US_State_Multi_Facet_Id.json",
},
{
[]string{"test_var_3"},
"Company<-containedInPlace+{typeOf:Company}",
nil,
"companies.json",
},
} {
goldenFile := c.goldenFile
resp, err := mixer.V2Observation(ctx, &pbv2.ObservationRequest{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
{
"by_variable": {
"test_var_3": {
"by_entity": {
"ein/1": {
"ordered_facets": [
{
"facet_id": "custom",
"observations": [
{
"date": "2015",
"value": 1
},
{
"date": "2023",
"value": 100
}
],
"obs_count": 2,
"earliest_date": "2015",
"latest_date": "2023"
}
]
},
"ein/2": {
"ordered_facets": [
{
"facet_id": "custom",
"observations": [
{
"date": "2015",
"value": 2
},
{
"date": "2023",
"value": 200
}
],
"obs_count": 2,
"earliest_date": "2015",
"latest_date": "2023"
}
]
},
"ein/3": {
"ordered_facets": [
{
"facet_id": "custom",
"observations": [
{
"date": "2023",
"value": 300
}
],
"obs_count": 1,
"earliest_date": "2023",
"latest_date": "2023"
}
]
}
}
}
},
"facets": {
"custom": {
"import_name": "Custom Prov",
"provenance_url": "custom.datacommons.org"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ func TestFetchContainInAll(t *testing.T) {
"country/FRA<-containedInPlace+{typeOf:AdministrativeArea2}",
"FRA_AA2.json",
},
{
[]string{"test_var_3"},
"Company<-containedInPlace+{typeOf:Company}",
"companies.json",
},
} {
goldenFile := c.goldenFile
resp, err := mixer.V2Observation(ctx, &pbv2.ObservationRequest{
Expand Down
1 change: 1 addition & 0 deletions internal/server/v2/observation/golden/variable/empty.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
4 changes: 4 additions & 0 deletions internal/server/v2/observation/golden/variable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ func TestVariable(t *testing.T) {
[]string{"wikidataId/Q506877", "wikidataId/Q1951", "fake_place", "test_entity"},
"result.json",
},
{
[]string{},
"empty.json",
},
} {
goldenFile := c.goldenFile
resp, err := mixer.V2Observation(ctx, &pbv2.ObservationRequest{
Expand Down
18 changes: 7 additions & 11 deletions internal/server/v2/observation/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
package observation

import (
"database/sql"

pb "github.com/datacommonsorg/mixer/internal/proto"
pbv2 "github.com/datacommonsorg/mixer/internal/proto/v2"
"github.com/datacommonsorg/mixer/internal/server/v2/shared"
"github.com/datacommonsorg/mixer/internal/sqldb"
"google.golang.org/protobuf/proto"
)

Expand All @@ -38,20 +37,17 @@ func initObservationResult(variables []string) *pbv2.ObservationResponse {
}

func handleSQLRows(
rows *sql.Rows,
rows []*sqldb.Observation,
variables []string,
) (map[string]map[string]map[string][]*pb.PointStat, error) {
) map[string]map[string]map[string][]*pb.PointStat {
// result is keyed by variable, entity and provID
result := make(map[string]map[string]map[string][]*pb.PointStat)
for _, variable := range variables {
result[variable] = make(map[string]map[string][]*pb.PointStat)
}
for rows.Next() {
var entity, variable, date, prov string
var value float64
if err := rows.Scan(&entity, &variable, &date, &value, &prov); err != nil {
return nil, err
}
for _, row := range rows {
entity, variable, date, prov := row.Entity, row.Variable, row.Date, row.Provenance
value := row.Value
if result[variable][entity] == nil {
result[variable][entity] = map[string][]*pb.PointStat{}
}
Expand All @@ -66,7 +62,7 @@ func handleSQLRows(
},
)
}
return result, rows.Err()
return result
}

func processSqlData(
Expand Down
Loading

0 comments on commit 2eab7f6

Please sign in to comment.