Skip to content

Commit

Permalink
Merge pull request #4 from paritytech/feat/Fix_bigquery_connector
Browse files Browse the repository at this point in the history
Fix big query connection issues
  • Loading branch information
pranaydotparity authored Nov 18, 2024
2 parents 029ddb0 + f75fe00 commit 43bb2da
Show file tree
Hide file tree
Showing 13 changed files with 53 additions and 33 deletions.
21 changes: 16 additions & 5 deletions dotlakeIngest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ if [[ $(yq eval '.databases[0].type' config.yaml) == "postgres" ]]; then
SQLALCHEMY_URI="postgres+psycopg2://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:${DB_PORT}/${DB_NAME}"
elif [[ $(yq eval '.databases[0].type' config.yaml) == "mysql" ]]; then
SQLALCHEMY_URI="mysql+mysqldb://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:${DB_PORT}/${DB_NAME}"
elif [[ $(yq eval '.databases[0].type' config.yaml) == "bigquery" ]]; then
SQLALCHEMY_URI="bigquery://${DB_PROJECT}"
else
echo "Unsupported database type. Only postgres and mysql are supported."
echo "Unsupported database type. Only postgres, bigquery and mysql are supported."
exit 1
fi

Expand All @@ -68,14 +70,19 @@ export DB_PORT="$DB_PORT"
export DB_NAME="$DB_NAME"
export DB_USER="$DB_USER"
export DB_PASSWORD="$DB_PASSWORD"
export PROJECT_ID="$DB_PROJECT"
export DB_PROJECT="$DB_PROJECT"
export CREDENTIALS_PATH="$DB_CRED_PATH"
export DATASET="$DB_DATASET"
export TABLE="$DB_TABLE"
export DB_DATASET="$DB_DATASET"
export DB_TABLE="$DB_TABLE"
export SQLALCHEMY_URI="$SQLALCHEMY_URI"
export INGEST_MODE="$INGEST_MODE"
export START_BLOCK="$START_BLOCK"
export END_BLOCK="$END_BLOCK"
if [[ -n "$DB_CRED_PATH" ]]; then
DB_CREDENTIALS=$(<"$DB_CRED_PATH")
export DB_CREDENTIALS="$DB_CREDENTIALS"
fi


cd ingest
docker-compose up -d
Expand All @@ -88,7 +95,11 @@ docker exec -it superset superset fab create-admin \
--password admin

docker exec -it superset superset db upgrade
docker exec -it superset superset set_database_uri -d my_mysql_db -u "$SQLALCHEMY_URI"
if [[ $DB_NAME == "bigquery" ]]; then
docker exec -it superset superset set_database_uri -d "$DB_NAME" -u "$SQLALCHEMY_URI" -se "{\"credentials_info\": $DB_CREDENTIALS}"
else
docker exec -it superset superset set_database_uri -d "$DB_NAME" -u "$SQLALCHEMY_URI"
fi

cd ..

Expand Down
12 changes: 7 additions & 5 deletions ingest/Home.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ def parse_arguments():
parser.add_argument("--db_path")
parser.add_argument("--db_project")
parser.add_argument("--db_cred_path")
parser.add_argument("--db_credentials")
parser.add_argument("--db_dataset")
parser.add_argument("--db_table")
parser.add_argument("--db_host", required=False, help="Database host")
parser.add_argument("--db_port", required=False, type=int, help="Database port")
parser.add_argument("--db_port", required=False, help="Database port")
parser.add_argument("--db_user", required=False, help="Database user")
parser.add_argument("--db_password", required=False, help="Database password")
parser.add_argument("--db_name", required=False, help="Database name")
Expand All @@ -35,6 +36,7 @@ def parse_arguments():
'database_dataset': args.db_dataset,
'database_table': args.db_table,
'database_cred_path': args.db_cred_path,
'database_credentials': args.db_credentials,
'database_path': args.db_path,
'database_host': args.db_host,
'database_port': args.db_port,
Expand Down Expand Up @@ -95,7 +97,7 @@ def parse_arguments():
elif args.database == 'mysql':
num_extrinsics = len(json.loads(recent_blocks['extrinsics'].iloc[0]))
elif args.database == 'bigquery':
num_extrinsics = len(json.loads(recent_blocks['extrinsics'].iloc[0]))
num_extrinsics = len(recent_blocks['extrinsics'].iloc[0])
else:
num_extrinsics = 0 # Default value if database type is not recognized
extrinsics_col.metric("Extrinsics", num_extrinsics)
Expand All @@ -115,9 +117,9 @@ def parse_arguments():
)
elif args.database == 'bigquery':
num_events = (
len(json.loads(recent_blocks['onfinalize'].iloc[0])['events']) +
len(json.loads(recent_blocks['oninitialize'].iloc[0])['events']) +
sum(len(ex['events']) for ex in json.loads(recent_blocks['extrinsics'].iloc[0]))
len(recent_blocks['onfinalize'].iloc[0]['events']) +
len(recent_blocks['oninitialize'].iloc[0]['events']) +
sum(len(ex['events']) for ex in recent_blocks['extrinsics'].iloc[0])
)
else:
num_events = 0 # Default value if database type is not recognized
Expand Down
Binary file removed ingest/__pycache__/database_utils.cpython-39.pyc
Binary file not shown.
Binary file removed ingest/__pycache__/mysql_utils.cpython-39.pyc
Binary file not shown.
Binary file removed ingest/__pycache__/write_block.cpython-39.pyc
Binary file not shown.
14 changes: 7 additions & 7 deletions ingest/bigquery_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
from google.cloud import bigquery
from google.oauth2 import service_account
import json

def connect_to_bigquery(project_id, credentials_path):
"""
Expand All @@ -13,17 +14,16 @@ def connect_to_bigquery(project_id, credentials_path):
Returns:
google.cloud.bigquery.client.Client: A BigQuery client.
"""
if not os.path.exists(credentials_path):
raise FileNotFoundError(f"Credentials file not found at {credentials_path}")
# if not os.path.exists(credentials_path):
# raise FileNotFoundError(f"Credentials file not found at {credentials_path}")

credentials = service_account.Credentials.from_service_account_file(
credentials_path,
scopes=["https://www.googleapis.com/auth/cloud-platform"],
credentials = service_account.Credentials.from_service_account_info(
json.loads(credentials_path)
)

return bigquery.Client(credentials=credentials, project=project_id)

def create_blocks_table(client, dataset_id, table_id):
def create_blocks_table(client, dataset_id, table_id, project_id):
"""
Create the blocks table if it doesn't exist.
Expand Down Expand Up @@ -98,7 +98,7 @@ def create_blocks_table(client, dataset_id, table_id):
])
]

table = bigquery.Table(f"{client.project}.{dataset_id}.{table_id}", schema=schema)
table = bigquery.Table(f"{project_id}.{dataset_id}.{table_id}", schema=schema)
table = client.create_table(table, exists_ok=True)
print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")

Expand Down
6 changes: 3 additions & 3 deletions ingest/database_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def connect_to_database(database_info: Dict[str, Any]):
)
elif database_info['database'] == 'bigquery':
from bigquery_utils import connect_to_bigquery
return connect_to_bigquery(database_info['database_project'], database_info['database_cred_path'])
return connect_to_bigquery(database_info['database_project'], database_info['database_credentials'])
else:
raise ValueError(f"Unsupported database type: {database_info['database']}")

Expand All @@ -34,7 +34,7 @@ def create_tables(db_connection, database_info: Dict[str, Any], chain: str, rela
create_mysql_tables(db_connection, chain, relay_chain)
elif database_info['database'] == 'bigquery':
from bigquery_utils import create_blocks_table as create_bigquery_tables
create_bigquery_tables(db_connection, database_info['database_dataset'], database_info['database_table'])
create_bigquery_tables(db_connection, database_info['database_dataset'], database_info['database_table'], database_info['database_project'])
else:
raise ValueError(f"Unsupported database type: {database_info['database']}")

Expand Down Expand Up @@ -85,7 +85,7 @@ def query_last_block(db_connection, database_info: Dict[str, Any], chain: str, r
fetch_last_block_query = f"SELECT * FROM blocks_{relay_chain}_{chain} ORDER BY number DESC LIMIT 1"
else:
if database_info['database'] == 'bigquery':
fetch_last_block_query = f"SELECT * FROM {database_info['database_dataset']}.{database_info['database_table']} WHERE number={block_num} LIMIT 1"
fetch_last_block_query = f"SELECT * FROM {database_info['database_dataset']}.{database_info['database_table']} WHERE number='{block_num}' LIMIT 1"
elif database_info['database'] == 'postgres':
fetch_last_block_query = f"SELECT * FROM blocks_{relay_chain}_{chain} WHERE number='{block_num}' LIMIT 1"
else:
Expand Down
7 changes: 4 additions & 3 deletions ingest/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ services:
- DB_NAME=${DB_NAME}
- DB_USER=${DB_USER}
- DB_PASSWORD=${DB_PASSWORD}
- PROJECT_ID=${PROJECT_ID}
- DB_PROJECT=${DB_PROJECT}
- CREDENTIALS_PATH=${CREDENTIALS_PATH}
- DATASET=${DATASET}
- TABLE=${TABLE}
- DB_CREDENTIALS=${DB_CREDENTIALS}
- DB_DATASET=${DB_DATASET}
- DB_TABLE=${DB_TABLE}
- INGEST_MODE=${INGEST_MODE}
- START_BLOCK=${START_BLOCK}
- END_BLOCK=${END_BLOCK}
Expand Down
4 changes: 3 additions & 1 deletion ingest/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ def parse_arguments():
parser.add_argument("--db_path", required=True)
parser.add_argument("--db_project")
parser.add_argument("--db_cred_path")
parser.add_argument("--db_credentials")
parser.add_argument("--db_dataset")
parser.add_argument("--db_table")
parser.add_argument("--db_host", required=False, help="Database host")
parser.add_argument("--db_port", required=False, type=int, help="Database port")
parser.add_argument("--db_port", required=False, help="Database port")
parser.add_argument("--db_user", required=False, help="Database user")
parser.add_argument("--db_password", required=False, help="Database password")
parser.add_argument("--db_name", required=False, help="Database name")
Expand All @@ -39,6 +40,7 @@ def main():
'database_dataset': args.db_dataset,
'database_table': args.db_table,
'database_cred_path': args.db_cred_path,
'database_credentials': args.db_credentials,
'database_path': args.db_path,
'database_host': args.db_host,
'database_port': args.db_port,
Expand Down
14 changes: 8 additions & 6 deletions ingest/pages/Search.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ def parse_arguments():
parser.add_argument("--db_path")
parser.add_argument("--db_project")
parser.add_argument("--db_cred_path")
parser.add_argument("--db_credentials")
parser.add_argument("--db_dataset")
parser.add_argument("--db_table")
parser.add_argument("--db_host", required=False, help="Database host")
parser.add_argument("--db_port", required=False, type=int, help="Database port")
parser.add_argument("--db_port", required=False, help="Database port")
parser.add_argument("--db_user", required=False, help="Database user")
parser.add_argument("--db_password", required=False, help="Database password")
parser.add_argument("--db_name", required=False, help="Database name")
Expand All @@ -33,6 +34,7 @@ def parse_arguments():
'database_dataset': args.db_dataset,
'database_table': args.db_table,
'database_cred_path': args.db_cred_path,
'database_credentials': args.db_credentials,
'database_path': args.db_path,
'database_host': args.db_host,
'database_port': args.db_port,
Expand Down Expand Up @@ -83,7 +85,7 @@ def parse_arguments():
elif args.database == 'mysql':
extrinsics = pd.DataFrame(json.loads(result['extrinsics'].iloc[0]))
elif args.database == 'bigquery':
extrinsics = pd.DataFrame(json.loads(result['extrinsics'].iloc[0]))
extrinsics = pd.DataFrame(result['extrinsics'].iloc[0])
else:
extrinsics = pd.DataFrame() # Default empty DataFrame if database type is not recognized
st.dataframe(extrinsics)
Expand All @@ -102,9 +104,9 @@ def parse_arguments():
] + json.loads(result['oninitialize'].iloc[0])['events'] + json.loads(result['onfinalize'].iloc[0])['events']
elif args.database == 'bigquery':
events = [
event for extrinsic in json.loads(result['extrinsics'].iloc[0])
event for extrinsic in result['extrinsics'].iloc[0]
for event in extrinsic['events']
] + json.loads(result['oninitialize'].iloc[0])['events'] + json.loads(result['onfinalize'].iloc[0])['events']
] + result['oninitialize'].iloc[0]['events'].tolist() + result['onfinalize'].iloc[0]['events'].tolist()
else:
events = [] # Default empty list if database type is not recognized
events = pd.DataFrame(events)
Expand All @@ -113,7 +115,7 @@ def parse_arguments():
else:
st.warning(f"No block found with number {block_number}")

except ValueError:
st.error("Please enter a valid integer for the block number.")
# except ValueError:
# st.error("Please enter a valid integer for the block number.")
except Exception as e:
st.error(f"An error occurred: please refresh the page {e}")
4 changes: 3 additions & 1 deletion ingest/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
requests==2.31.0
google-cloud-storage==2.7.0
google-cloud-bigquery==3.10.0
pandas==1.5.3
streamlit==1.38.0
streamlit-autorefresh==1.0.1
numpy==1.24.2
psycopg2-binary==2.9.9
mysql-connector-python==9.0.0
mysql-connector-python==9.0.0
db-dtypes==1.1.1
4 changes: 2 additions & 2 deletions ingest/start-ingest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ echo "End Block: $END_BLOCK"

# Start the main.py script
echo "Starting main.py script..."
python3 main.py --chain "$CHAIN" --relay_chain "$RELAY_CHAIN" --wss "$WSS" --db_path "$DB_PATH" --database "$DB_TYPE" --db_project "$DB_PROJECT" --db_cred_path "$DB_CRED_PATH" --db_dataset "$DB_DATASET" --db_table "$DB_TABLE" --db_host "$DB_HOST" --db_port "$DB_PORT" --db_user "$DB_USER" --db_password "$DB_PASSWORD" --db_name "$DB_NAME" --ingest_mode "$INGEST_MODE" --start_block "$START_BLOCK" --end_block "$END_BLOCK" 2>&1 &
python3 main.py --chain "$CHAIN" --relay_chain "$RELAY_CHAIN" --wss "$WSS" --db_path "$DB_PATH" --database "$DB_TYPE" --db_project "$DB_PROJECT" --db_cred_path "$DB_CRED_PATH" --db_credentials "$DB_CREDENTIALS" --db_dataset "$DB_DATASET" --db_table "$DB_TABLE" --db_host "$DB_HOST" --db_port "$DB_PORT" --db_user "$DB_USER" --db_password "$DB_PASSWORD" --db_name "$DB_NAME" --ingest_mode "$INGEST_MODE" --start_block "$START_BLOCK" --end_block "$END_BLOCK" 2>&1 &


# Start the Streamlit app
echo "Starting Streamlit app..."
python3 -m streamlit run Home.py --server.port 8501 -- --db_path "$DB_PATH" --chain "$CHAIN" --relay_chain "$RELAY_CHAIN" --database "$DB_TYPE" --db_project "$DB_PROJECT" --db_cred_path "$DB_CRED_PATH" --db_dataset "$DB_DATASET" --db_table "$DB_TABLE" --db_host "$DB_HOST" --db_port "$DB_PORT" --db_user "$DB_USER" --db_password "$DB_PASSWORD" --db_name "$DB_NAME" &
python3 -m streamlit run Home.py --server.port 8501 -- --db_path "$DB_PATH" --chain "$CHAIN" --relay_chain "$RELAY_CHAIN" --database "$DB_TYPE" --db_project "$DB_PROJECT" --db_cred_path "$DB_CRED_PATH" --db_credentials "$DB_CREDENTIALS" --db_dataset "$DB_DATASET" --db_table "$DB_TABLE" --db_host "$DB_HOST" --db_port "$DB_PORT" --db_user "$DB_USER" --db_password "$DB_PASSWORD" --db_name "$DB_NAME" &

# Wait for all background processes to finish
wait
Binary file modified ingest/superset_home/superset.db
Binary file not shown.

0 comments on commit 43bb2da

Please sign in to comment.