Skip to content

Commit

Permalink
Passes metadata to destinations.
Browse files Browse the repository at this point in the history
  • Loading branch information
atompie committed Jul 15, 2024
1 parent 065de79 commit 1ba417a
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 25 deletions.
6 changes: 4 additions & 2 deletions tracardi/process_engine/destination/civicrm_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ async def _dispatch(self, data):

await client.add_contact(data)

async def dispatch_profile(self, data, profile: Optional[Profile], session: Optional[Session], changed_fields: List[dict]=None):
async def dispatch_profile(self, data, profile: Optional[Profile], session: Optional[Session],
changed_fields: List[dict] = None, metadata=None):
await self._dispatch(data)

async def dispatch_event(self, data, profile: Optional[Profile], session: Optional[Session], event: Event):
async def dispatch_event(self, data, profile: Optional[Profile], session: Optional[Session], event: Event,
metadata=None):
await self._dispatch(data)
7 changes: 4 additions & 3 deletions tracardi/process_engine/destination/destination_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ def __init__(self, debug: bool, resource: Resource, destination: Destination):
self.debug = debug
self.resource = resource

async def dispatch_profile(self, data, profile: Profile, session: Optional[Session], changed_fields: List[dict] = None):
async def dispatch_profile(self, data, profile: Profile, session: Optional[Session],
changed_fields: List[dict] = None, metadata=None):
pass

async def dispatch_event(self, data, profile: Optional[Profile], session: Optional[Session], event: Event):
async def dispatch_event(self, data, profile: Optional[Profile], session: Optional[Session], event: Event,
metadata=None):
pass


def _get_credentials(self):
return self.resource.credentials.test if self.debug else self.resource.credentials.production
8 changes: 5 additions & 3 deletions tracardi/process_engine/destination/ghost_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class GhostCredentials(BaseModel):
api_url: str
api_key: Optional[str] = None


class GhostConnector(DestinationInterface):

def _dispatch(self, payload):
Expand All @@ -24,14 +25,15 @@ def _dispatch(self, payload):

init = self.destination.destination.init

#TODO Finish.
# TODO Finish.

except Exception as e:
logger.error(str(e))
raise e

async def dispatch_profile(self, mapped_data, profile: Profile, session: Session, changed_fields: List[dict]=None):
async def dispatch_profile(self, mapped_data, profile: Profile, session: Session, changed_fields: List[dict] = None,
metadata=None):
self._dispatch(payload=mapped_data)

async def dispatch_event(self, mapped_data, profile: Profile, session: Session, event: Event):
async def dispatch_event(self, mapped_data, profile: Profile, session: Session, event: Event, metadata=None):
self._dispatch(payload=mapped_data)
8 changes: 5 additions & 3 deletions tracardi/process_engine/destination/http_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def get_params(self, body: dict) -> dict:

if self.method.lower() == 'get':
params = flatten(body)
params = {key: self._convert_params(value) for key,value in params.items() if value is not None}
params = {key: self._convert_params(value) for key, value in params.items() if value is not None}
return {
"params": params
}
Expand Down Expand Up @@ -132,8 +132,10 @@ async def _dispatch(self, data, changed_fields):
logger.error(str(e), e, exc_info=True)
raise e

async def dispatch_profile(self, data, profile: Profile, session: Optional[Session], changed_fields: List[dict]=None):
async def dispatch_profile(self, data, profile: Profile, session: Optional[Session],
changed_fields: List[dict] = None, metadata=None):
await self._dispatch(data, changed_fields)

async def dispatch_event(self, data, profile: Optional[Profile], session: Optional[Session], event: Event):
async def dispatch_event(self, data, profile: Optional[Profile], session: Optional[Session], event: Event,
metadata=None):
await self._dispatch(data, [])
5 changes: 3 additions & 2 deletions tracardi/process_engine/destination/hubspot_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,9 @@ async def _dispatch(self, data: dict, profile: Profile): # Data comes from mapp
# Try to update
await self._update_contact(payload, profile.id, hubspot_id)

async def dispatch_profile(self, data: dict, profile: Profile, session: Session, changed_fields: List[dict]=None):
async def dispatch_profile(self, data: dict, profile: Profile, session: Session, changed_fields: List[dict] = None,
metadata=None):
await self._dispatch(data, profile)

async def dispatch_event(self, data: dict, profile: Profile, session: Session, event: Event):
async def dispatch_event(self, data: dict, profile: Profile, session: Session, event: Event, metadata=None):
await self._dispatch(data, profile)
5 changes: 3 additions & 2 deletions tracardi/process_engine/destination/mautic_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ async def _dispatch(self, data):

await resource_db.save_record(self.resource)

async def dispatch_profile(self, data, profile: Profile, session: Session, changed_fields: List[dict]=None):
async def dispatch_profile(self, data, profile: Profile, session: Session, changed_fields: List[dict] = None,
metadata=None):
await self._dispatch(data)

async def dispatch_event(self, data, profile: Profile, session: Session, event: Event):
async def dispatch_event(self, data, profile: Profile, session: Session, event: Event, metadata=None):
await self._dispatch(data)
23 changes: 18 additions & 5 deletions tracardi/service/destination/dispatchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
async def event_destination_dispatch(profile: Optional[Profile],
session: Optional[Session],
events: List[Event],
debug):
debug,
metadata=None
):
dot = DotAccessor(profile, session)
for event in events:
try:
Expand All @@ -32,7 +34,11 @@ async def event_destination_dispatch(profile: Optional[Profile],

async for destination_instance, reshaped_data in get_dispatch_destination_and_data(dot, destinations,
debug):
await destination_instance.dispatch_event(reshaped_data, profile=profile, session=session, event=event)
await destination_instance.dispatch_event(reshaped_data,
profile=profile,
session=session,
event=event,
metadata=metadata)
except Exception as e:
logger.error(
str(e),
Expand All @@ -47,18 +53,26 @@ async def event_destination_dispatch(profile: Optional[Profile],
)
)


async def profile_destination_dispatch(profile: Optional[Profile],
session: Optional[Session],
changed_fields: List[dict],
debug: bool): # debug is used to find out which resource to use.
debug: bool,
metadata: dict = None): # debug is used to find out which resource to use.

dot = DotAccessor(profile, session)
destinations: List[Destination] = await load_profile_destinations()

async for destination_instance, reshaped_data in get_dispatch_destination_and_data(dot, destinations, debug):
try:
logger.info(f"Dispatching {destination_instance}. Profile id: {get_entity_id(profile)}.")
await destination_instance.dispatch_profile(reshaped_data, profile=profile, session=session, changed_fields=changed_fields)
await destination_instance.dispatch_profile(
reshaped_data,
profile=profile,
session=session,
changed_fields=changed_fields,
metadata=metadata
)
except Exception as e:
logger.error(
str(e),
Expand All @@ -72,4 +86,3 @@ async def profile_destination_dispatch(profile: Optional[Profile],
traceback=get_traceback(e)
)
)

20 changes: 15 additions & 5 deletions tracardi/service/tracking/destination/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@
async def sync_profile_destination(profile: Optional[Profile], session: Session, changed_fields: List[dict]):
has_profile = isinstance(profile, Profile)
if has_profile and tracardi.enable_profile_destinations and profile.has_not_saved_changes():
await profile_destination_dispatch(profile=profile,
session=session,
changed_fields=changed_fields,
debug=False)
await profile_destination_dispatch(
profile=profile,
session=session,
changed_fields=changed_fields,
debug=False,
metadata={
"source": "collector",
"mode": "sync"
}
)


async def sync_event_destination(profile: Optional[Profile], session: Session, events: List[Event], debug):
Expand All @@ -28,5 +34,9 @@ async def sync_event_destination(profile: Optional[Profile], session: Session, e
profile,
session,
events,
debug
debug,
metadata={
"source": "collector",
"mode": "sync"
}
)

0 comments on commit 1ba417a

Please sign in to comment.