Skip to content

Commit

Permalink
Merge branch '0.8.2-dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
atompie committed Jan 18, 2024
2 parents 19613fb + 96c0e85 commit 0560e5c
Show file tree
Hide file tree
Showing 85 changed files with 15,161 additions and 1,043 deletions.
54 changes: 54 additions & 0 deletions test/integration/test_lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from time import sleep

from tracardi.context import ServerContext, Context
from tracardi.service.storage.redis_client import RedisClient
from tracardi.service.tracking.locking import Lock, RELEASED


def test_lock_ttl():
with ServerContext(Context(production=False)):
redis = RedisClient()
key = Lock.get_key("namespace:", "profile", "10")
lock = Lock(redis, key, default_lock_ttl=1)
assert lock.is_locked() is False
lock.lock("name1")
assert lock.is_locked() is True
sleep(2)
assert lock.is_locked() is False
lock.unlock()

def test_lock_expires():
with ServerContext(Context(production=False)):
redis = RedisClient()
key = Lock.get_key("namespace:", "profile", "10")
lock = Lock(redis, key, default_lock_ttl=10)
assert lock.state == RELEASED
assert lock.ttl == 10
lock.lock('name1')
assert lock.ttl == 10


def test_lock_global_value():
with ServerContext(Context(production=False)):
redis = RedisClient()
key = Lock.get_key("namespace:", "profile", "10")
lock1 = Lock(redis, key, default_lock_ttl=1)
assert not lock1.is_locked()

lock1.lock("name1")

lock2 = Lock(redis, key, default_lock_ttl=10)
assert lock2.is_locked()
assert lock1.get_locked_inside() == lock2.get_locked_inside()
assert lock1.state == lock2.state

lock1.break_in()
assert lock2.is_locked()
assert lock1.get_locked_inside() == lock2.get_locked_inside()
assert lock1.state == lock2.state

lock2.unlock()

assert not lock1.is_locked()
assert lock1.get_locked_inside() == lock2.get_locked_inside()
assert lock1.state == lock2.state
181 changes: 181 additions & 0 deletions test/integration/test_mutex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import threading

from time import sleep

import pytest

from tracardi.context import ServerContext, Context
from tracardi.service.storage.redis_client import RedisClient
from tracardi.service.tracking.locking import Lock, BROKE, LOCKED, mutex


def test_mutex_on_error_must_be_unlocked():
with ServerContext(Context(production=False)):
redis = RedisClient()
key = Lock.get_key("namespace:", "profile", "10")
lock = Lock(redis, key, default_lock_ttl=100)
with pytest.raises(ValueError):
with mutex(lock, name="task1") as _lock:
# Lock should be true because is locked
assert lock.is_locked() is True
raise ValueError("test")

# Should be unlocked if execution fails
assert lock.is_locked() is False

with mutex(lock, name="task2") as _lock:
# Lock should be true because is locked
assert lock.is_locked() is True

# Should be unlocked if completed
assert lock.is_locked() is False

def test_mutex_multithread_lock_break():
state = []
with ServerContext(Context(production=False)):
redis = RedisClient()
key = Lock.get_key("namespace:", "profile", "10")
lock = Lock(redis, key, default_lock_ttl=100)
def task1():
try:
with ServerContext(Context(production=False)):

with mutex(lock, name="task1") as _lock:
# Lock should be true because is locked
assert _lock.state == LOCKED
sleep(2)
print(_lock.get_state())
state.append(1)
except Exception as e:
print(repr(e))
raise e

def task2():
try:
with ServerContext(Context(production=False)):
assert lock.is_locked()
with mutex(lock, name="task2", break_after_time=1) as _lock:
assert _lock.is_locked() is True
assert _lock.state == BROKE
state.append(2)
except Exception as e:
print(repr(e))
raise e

thread1 = threading.Thread(target=task1)
thread2 = threading.Thread(target=task2)

# Start threads
thread1.start()
sleep(.2)
thread2.start()

# Join threads to wait for them to finish
thread1.join()
thread2.join()

assert state == [2,1]

def test_mutex_multithread_lock_with_waiting():
state = []
with ServerContext(Context(production=False)):
redis = RedisClient()
key = Lock.get_key("namespace:", "profile", "10")
lock = Lock(redis, key, default_lock_ttl=3)
def task1():
try:
with ServerContext(Context(production=False)):
# Task 1 waits for 1 sec.
with mutex(lock, name="task1") as _lock:
# Wait for 1 sec. Check if released every .5 sec
sleep(1)
state.append(1)

except Exception as e:
print(repr(e))
raise e
def task2():
try:
with ServerContext(Context(production=False)):
assert lock.is_locked()
# Wait for 3 sec. Check if released every .5 sec
with mutex(lock, name="task2") as _lock:
sleep(1)
state.append(2)

except Exception as e:
print(repr(e))
raise e

thread1 = threading.Thread(target=task1)
thread2 = threading.Thread(target=task2)

# Start threads
thread1.start()
sleep(.2)
thread2.start()

# Join threads to wait for them to finish
thread1.join()
thread2.join()

assert state == [1,2]


def test_mutex_multithread_lock_with_skip():
state = []
with ServerContext(Context(production=False)):
redis = RedisClient()
key = Lock.get_key("namespace:", "profile", "10")
lock = Lock(redis, key, default_lock_ttl=1)
def task1():
try:
with ServerContext(Context(production=False)):
# Create task 1 locked for 1 sec
with mutex(lock, name="task1") as _lock:
assert lock.is_locked()
sleep(1)
state.append(1)

except Exception as e:
print(repr(e))
raise e
def task2():
try:
with ServerContext(Context(production=False)):
assert lock.is_locked()

# we expect it to fail as it is locked by task1
with pytest.raises(BlockingIOError):
# Get mutext that fails when locked
with mutex(lock, name="task2", raise_error_when_locked=True):
state.append(2)

# Wait 2 sec (longer then task1 lock which is 1s)
sleep(2)

# Should not be locked now
assert not lock.is_locked()

# We should be able to open mutex
with mutex(lock, name="task2") as _lock:
assert _lock.is_locked()
state.append(3)

except Exception as e:
print(repr(e))
raise e

thread1 = threading.Thread(target=task1)
thread2 = threading.Thread(target=task2)

# Start threads
thread1.start()
sleep(.2)
thread2.start()

# Join threads to wait for them to finish
thread1.join()
thread2.join()

assert state == [1, 3]
4 changes: 2 additions & 2 deletions test/unit/plugins/test_plugin_new_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def test_plugin_new_profile_true():
init = {}
payload = {}
profile = Profile(id="1")
profile.operation.new = True
profile.set_new()
event = Event(
id='1',
type='text',
Expand All @@ -28,7 +28,7 @@ def test_plugin_new_profile_false():
init = {}
payload = {}
profile = Profile(id="1")
profile.operation.new = False
profile.new(False)
event = Event(
id='1',
type='text',
Expand Down
4 changes: 2 additions & 2 deletions test/unit/plugins/test_plugin_new_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ def test_plugin_new_visit_true():
init = {}
payload = {}
session = Session(id="1", metadata=SessionMetadata())
session.operation.new = True
session.set_new()

result = run_plugin(NewVisitAction, init, payload, session=session)
assert result.output.value == payload
Expand All @@ -18,7 +18,7 @@ def test_plugin_new_visit_false():
init = {}
payload = {}
session = Session(id="1", metadata=SessionMetadata())
session.operation.new = False
session.set_new(False)

result = run_plugin(NewVisitAction, init, payload, session=session)
assert result.output.value == payload
Expand Down
5 changes: 5 additions & 0 deletions test/unit/test_dotty.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ def test_none_to_dotty():
a = dotty(None)
assert bool(a) is False

def test_list_in_dotty():
a = dotty({"id":"a", "ids": ["a"]})
a['ids'].append("b")
assert len(a['ids']) == 2
assert a.to_dict()['ids'] == a['ids']


def test_spaces():
Expand Down
2 changes: 1 addition & 1 deletion test/unit/test_frozen_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@
def test_frozen_session():
with ServerContext(Context(production=True)):
session = FrozenSession(**Session.new().model_dump())
assert session.operation.new is True
assert session.is_new()
with pytest.raises(ValidationError):
session.id = "1"
12 changes: 6 additions & 6 deletions test/unit/test_id_hash.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
PREFIX_PHONE_WHATSUP
from tracardi.service.utils.hasher import hash_id

tracardi.hash_id_webhook = "abc"
tracardi.auto_profile_merging = "abc"

def test_returns_string_with_length_40():
value = "test"
Expand All @@ -32,7 +32,7 @@ def test_add_hashed_ids_with_existing_email_ids():
)

# Invoke method
profile.add_hashed_ids()
profile.create_auto_merge_hashed_ids()

# Assertions
assert profile.has_hashed_email_id(PREFIX_EMAIL_BUSINESS) is True
Expand All @@ -57,7 +57,7 @@ def test_add_hashed_ids_with_existing_phone_ids():
)

# Invoke method
profile.add_hashed_ids()
profile.create_auto_merge_hashed_ids()

# Assertions
assert profile.has_hashed_phone_id(PREFIX_PHONE_BUSINESS) is True
Expand All @@ -71,7 +71,7 @@ def test_add_hashed_ids_with_no_existing_ids():
profile = Profile(id='1')

# Invoke method
profile.add_hashed_ids()
profile.create_auto_merge_hashed_ids()

# Assertions
assert profile.has_hashed_email_id(PREFIX_EMAIL_BUSINESS) is False
Expand Down Expand Up @@ -99,7 +99,7 @@ def test_add_hashed_ids_with_empty_email_ids():
)

# Invoke method
profile.add_hashed_ids()
profile.create_auto_merge_hashed_ids()

# Assertions
assert profile.has_hashed_email_id(PREFIX_EMAIL_BUSINESS) is False
Expand All @@ -124,7 +124,7 @@ def test_add_hashed_ids_with_empty_phone_ids():
)

# Invoke method
profile.add_hashed_ids()
profile.create_auto_merge_hashed_ids()

# Assertions
assert profile.has_hashed_phone_id(PREFIX_PHONE_BUSINESS) is False
Expand Down
12 changes: 5 additions & 7 deletions tracardi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,6 @@ def __init__(self, env):

self.skip_errors_on_profile_mapping = env.get('SKIP_ERRORS_ON_PROFILE_MAPPING', 'no').lower() == 'yes'

self.lock_on_data_computation = env.get('LOCK_ON_DATA_COMPUTATION', 'yes') == 'yes'

# Temporary flag
self.new_collector = env.get('NEW_COLLECTOR', 'yes').lower() == 'yes'

Expand Down Expand Up @@ -200,25 +198,25 @@ def __init__(self, env):
self.console_log_partitioning = env.get('CONSOLE_LOG_PARTITIONING', 'month')
self.user_log_partitioning = env.get('USER_LOG_PARTITIONING', 'year')
self.field_change_log_partitioning = env.get('FIELD_CHANGE_LOG_PARTITIONING', 'month')
self.hash_id_webhook = env.get('HASH_ID_WEBHOOK', None)
self.auto_profile_merging = env.get('AUTO_PROFILE_MERGING', 's>a.d-kljsa87^5adh')

self._config = None
self._unset_secrets()

if self.multi_tenant and (self.multi_tenant_manager_url is None or self.multi_tenant_manager_api_key is None):
if self.multi_tenant_manager_url is None:
logger.warning('No MULTI_TENANT_MANAGER_URL set for MULTI_TENANT mode. Either set '
'the MULTI_TENANT_MANAGER_URL or set MULTI_TENANT to "no"')
'the MULTI_TENANT_MANAGER_URL or set MULTI_TENANT to "no"')

if self.multi_tenant_manager_api_key is None:
logger.warning('No MULTI_TENANT_MANAGER_API_KEY set for MULTI_TENANT mode. Either set '
'the MULTI_TENANT_MANAGER_API_KEY or set MULTI_TENANT to "no"')
'the MULTI_TENANT_MANAGER_API_KEY or set MULTI_TENANT to "no"')

if self.multi_tenant and not is_valid_url(self.multi_tenant_manager_url):
logger.warning('Env MULTI_TENANT_MANAGER_URL is not valid URL.')

if self.hash_id_webhook and len(self.hash_id_webhook) < 20:
logger.warning('Security risk. Env HASH_ID_WEBHOOK is too short. It must be at least 20 chars long.')
if self.auto_profile_merging and len(self.auto_profile_merging) < 20:
logger.warning('Security risk. Env AUTO_PROFILE_MERGING is too short. It must be at least 20 chars long.')



Expand Down
Loading

0 comments on commit 0560e5c

Please sign in to comment.