forked from dlt-hub/verified-sources
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpipedrive_pipeline.py
70 lines (59 loc) · 2.45 KB
/
pipedrive_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import dlt
from pipedrive import pipedrive_source
def load_pipedrive() -> None:
"""Constructs a pipeline that will load all pipedrive data"""
# configure the pipeline with your destination details
pipeline = dlt.pipeline(
pipeline_name="pipedrive", destination="duckdb", dataset_name="pipedrive_data"
)
load_info = pipeline.run(pipedrive_source())
print(load_info)
print(pipeline.last_trace.last_normalize_info)
def load_selected_data() -> None:
"""Shows how to load just selected tables using `with_resources`"""
pipeline = dlt.pipeline(
pipeline_name="pipedrive", destination="duckdb", dataset_name="pipedrive_data"
)
# Use with_resources to select which entities to load
# Note: `custom_fields_mapping` must be included to translate custom field hashes to corresponding names
load_info = pipeline.run(
pipedrive_source().with_resources(
"products", "deals", "deals_participants", "custom_fields_mapping"
)
)
print(load_info)
# just to show how to access resources within source
pipedrive_data = pipedrive_source()
# print source info
print(pipedrive_data)
print()
# list resource names
print(pipedrive_data.resources.keys())
print()
# print `persons` resource info
print(pipedrive_data.resources["persons"])
print()
# alternatively
print(pipedrive_data.persons)
def load_from_start_date() -> None:
"""Example to incrementally load activities limited to items updated after a given date"""
pipeline = dlt.pipeline(
pipeline_name="pipedrive", destination="duckdb", dataset_name="pipedrive_data"
)
# First source configure to load everything except activities from the beginning
source = pipedrive_source()
source.resources["activities"].selected = False
# Another source configured to activities starting at the given date (custom_fields_mapping is included to translate custom field hashes to names)
activities_source = pipedrive_source(
since_timestamp="2023-03-01 00:00:00Z"
).with_resources("activities", "custom_fields_mapping")
# Run the pipeline with both sources
load_info = pipeline.run([source, activities_source])
print(load_info)
if __name__ == "__main__":
# run our main example
load_pipedrive()
# load selected tables and display resource info
# load_selected_data()
# load activities updated since given date
# load_from_start_date()