Skip to content

Commit

Permalink
Merge branch 'feat/DEX-2853/item-caching' into 'master'
Browse files Browse the repository at this point in the history
[DEX-2853] feat: failed item caching

Closes DEX-2853

See merge request nstmrt/rubygems/outbox!116
  • Loading branch information
Сатаров Юрий Сергеевич committed Feb 7, 2025
2 parents 0449cd2 + edc39a3 commit 05be2f4
Show file tree
Hide file tree
Showing 11 changed files with 374 additions and 22 deletions.
2 changes: 1 addition & 1 deletion .rubocop_todo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# Offense count: 1
# Configuration parameters: AllowedMethods, AllowedPatterns, CountRepeatedAttributes.
Metrics/AbcSize:
Max: 18
Max: 19

# Offense count: 5
# Configuration parameters: CountComments, CountAsOne, AllowedMethods, AllowedPatterns.
Expand Down
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Fixed

## [6.18.0] - 2025-02-04

### Added

- Add failed item caching in case of unrecoverable database conn issues
- Add `retry_latency` metric to measure retries

### Fixed

- Fix item processing cutoff timeout to be less than generic redis lock timeout

## [6.17.0] - 2025-01-30

### Added
Expand Down
72 changes: 68 additions & 4 deletions app/interactors/sbmt/outbox/process_item.rb
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
# frozen_string_literal: true

require "sbmt/outbox/metrics/utils"
require "sbmt/outbox/v2/redis_item_meta"

module Sbmt
module Outbox
class ProcessItem < Sbmt::Outbox::DryInteractor
param :item_class, reader: :private
param :item_id, reader: :private
option :worker_version, reader: :private, optional: true, default: -> { 1 }
option :cache_ttl_sec, reader: :private, optional: true, default: -> { 5 * 60 }
option :redis, reader: :private, optional: true, default: -> {}

METRICS_COUNTERS = %i[error_counter retry_counter sent_counter fetch_error_counter discarded_counter].freeze

delegate :log_success, :log_info, :log_failure, to: "Sbmt::Outbox.logger"
delegate :log_success, :log_info, :log_failure, :log_debug, to: "Sbmt::Outbox.logger"
delegate :item_process_middlewares, to: "Sbmt::Outbox"
delegate :box_type, :box_name, :owner, to: :item_class

attr_accessor :process_latency
attr_accessor :process_latency, :retry_latency

def call
log_success(
Expand All @@ -26,9 +29,23 @@ def call
item = nil

item_class.transaction do
item = yield fetch_item
item = yield fetch_item_and_lock_for_update

cached_item = fetch_redis_item_meta(redis_item_key(item_id))
if cached_retries_exceeded?(cached_item)
msg = "max retries exceeded: marking item as failed based on cached data: #{cached_item}"
item.set_errors_count(cached_item.errors_count)
track_failed(msg, item)
next Failure(msg)
end

if cached_greater_errors_count?(item, cached_item)
log_failure("inconsistent item: cached_errors_count:#{cached_item.errors_count} > db_errors_count:#{item.errors_count}: setting errors_count based on cached data:#{cached_item}")
item.set_errors_count(cached_item.errors_count)
end

if item.processed_at?
self.retry_latency = Time.current - item.created_at
item.config.retry_strategies.each do |retry_strategy|
yield check_retry_strategy(item, retry_strategy)
end
Expand Down Expand Up @@ -62,7 +79,48 @@ def call

private

def fetch_item
def cached_retries_exceeded?(cached_item)
return false unless cached_item

item_class.max_retries_exceeded?(cached_item.errors_count)
end

def cached_greater_errors_count?(db_item, cached_item)
return false unless cached_item

cached_item.errors_count > db_item.errors_count
end

def fetch_redis_item_meta(redis_key)
return if worker_version < 2

data = redis.call("GET", redis_key)
return if data.blank?

Sbmt::Outbox::V2::RedisItemMeta.deserialize!(data)
rescue => ex
log_debug("error while fetching redis meta: #{ex.message}")
nil
end

def set_redis_item_meta(item, ex)
return if worker_version < 2
return if item.nil?

redis_key = redis_item_key(item.id)
error_msg = format_exception_error(ex, extract_cause: false)
data = Sbmt::Outbox::V2::RedisItemMeta.new(errors_count: item.errors_count, error_msg: error_msg)
redis.call("SET", redis_key, data.to_s, "EX", cache_ttl_sec)
rescue => ex
log_debug("error while fetching redis meta: #{ex.message}")
nil
end

def redis_item_key(item_id)
"#{box_type}:#{item_class.box_name}:#{item_id}"
end

def fetch_item_and_lock_for_update
item = item_class
.lock("FOR UPDATE")
.find_by(id: item_id)
Expand Down Expand Up @@ -171,6 +229,7 @@ def track_failed(ex_or_msg, item = nil)
item.pending!
end
rescue => e
set_redis_item_meta(item, e)
log_error_handling_error(e, item)
end

Expand Down Expand Up @@ -259,6 +318,7 @@ def report_metrics(item)
end

track_process_latency(labels) if process_latency
track_retry_latency(labels) if retry_latency

return unless counters[:sent_counter].positive?

Expand All @@ -279,6 +339,10 @@ def counters
def track_process_latency(labels)
Yabeda.outbox.process_latency.measure(labels, process_latency.round(3))
end

def track_retry_latency(labels)
Yabeda.outbox.retry_latency.measure(labels, retry_latency.round(3))
end
end
end
end
22 changes: 17 additions & 5 deletions app/models/sbmt/outbox/base_item.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@ def bucket_partitions
end
end
end

def max_retries_exceeded?(count)
return false if config.strict_order
return true unless retriable?

count > config.max_retries
end

def retriable?
config.max_retries > 0
end
end

enum :status, {
Expand Down Expand Up @@ -135,20 +146,21 @@ def touch_processed_at
end

def retriable?
config.max_retries > 0
self.class.retriable?
end

def max_retries_exceeded?
return false if config.strict_order
return true unless retriable?

errors_count > config.max_retries
self.class.max_retries_exceeded?(errors_count)
end

def increment_errors_counter
increment(:errors_count)
end

def set_errors_count(count)
self.errors_count = count
end

def add_error(ex_or_msg)
increment_errors_counter

Expand Down
6 changes: 6 additions & 0 deletions config/initializers/yabeda.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@
buckets: [0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10, 20, 30].freeze,
comment: "A histogram for outbox/inbox deletion latency"

histogram :retry_latency,
tags: %i[type name partition owner],
unit: :seconds,
buckets: [1, 10, 20, 50, 120, 300, 900, 1800, 3600].freeze,
comment: "A histogram outbox retry latency"

counter :deleted_counter,
tags: %i[box_type box_name],
comment: "A counter for the number of deleted outbox/inbox items"
Expand Down
8 changes: 4 additions & 4 deletions lib/sbmt/outbox/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ class Engine < Rails::Engine
c.cdn_url = "https://cdn.jsdelivr.net/npm/sbmt-outbox-ui@0.0.8/dist/assets/index.js"
end
c.process_items = ActiveSupport::OrderedOptions.new.tap do |c|
c.general_timeout = 120
c.cutoff_timeout = 60
c.general_timeout = 180
c.cutoff_timeout = 90
c.batch_size = 200
end
c.worker = ActiveSupport::OrderedOptions.new.tap do |c|
Expand Down Expand Up @@ -54,8 +54,8 @@ class Engine < Rails::Engine
end
c.processor = ActiveSupport::OrderedOptions.new.tap do |pc|
pc.threads_count = 4
pc.general_timeout = 120
pc.cutoff_timeout = 60
pc.general_timeout = 180
pc.cutoff_timeout = 90
pc.brpop_delay = 1
end

Expand Down
16 changes: 13 additions & 3 deletions lib/sbmt/outbox/v2/processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,24 @@ module Outbox
module V2
class Processor < BoxProcessor
delegate :processor_config, :batch_process_middlewares, :logger, to: "Sbmt::Outbox"
attr_reader :lock_timeout, :brpop_delay
attr_reader :lock_timeout, :cache_ttl, :cutoff_timeout, :brpop_delay

REDIS_BRPOP_MIN_DELAY = 0.1

def initialize(
boxes,
threads_count: nil,
lock_timeout: nil,
cache_ttl: nil,
cutoff_timeout: nil,
brpop_delay: nil,
redis: nil
)
@lock_timeout = lock_timeout || processor_config.general_timeout
@cache_ttl = cache_ttl || @lock_timeout * 10
@cutoff_timeout = cutoff_timeout || processor_config.cutoff_timeout
@brpop_delay = brpop_delay || redis_brpop_delay(boxes.count, processor_config.brpop_delay)
@redis = redis

super(boxes: boxes, threads_count: threads_count || processor_config.threads_count, name: "processor", redis: redis)
end
Expand Down Expand Up @@ -66,14 +71,19 @@ def lock_task(scheduled_task)
end

def process(task)
lock_timer = Cutoff.new(lock_timeout)
lock_timer = Cutoff.new(cutoff_timeout)
last_id = 0
strict_order = task.item_class.config.strict_order

box_worker.item_execution_runtime.measure(task.yabeda_labels) do
Outbox.database_switcher.use_master do
task.ids.each do |id|
result = ProcessItem.call(task.item_class, id, worker_version: task.yabeda_labels[:worker_version])
result = ProcessItem.call(
task.item_class, id,
worker_version: task.yabeda_labels[:worker_version],
cache_ttl_sec: cache_ttl,
redis: @redis
)

box_worker.job_items_counter.increment(task.yabeda_labels)
last_id = id
Expand Down
46 changes: 46 additions & 0 deletions lib/sbmt/outbox/v2/redis_item_meta.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# frozen_string_literal: true

module Sbmt
module Outbox
module V2
class RedisItemMeta
attr_reader :version, :timestamp, :errors_count, :error_msg

CURRENT_VERSION = 1
MAX_ERROR_LEN = 200

def initialize(errors_count:, error_msg:, timestamp: Time.current.to_i, version: CURRENT_VERSION)
@errors_count = errors_count
@error_msg = error_msg
@timestamp = timestamp
@version = version
end

def to_s
serialize
end

def serialize
JSON.generate({
version: version,
timestamp: timestamp,
errors_count: errors_count,
error_msg: error_msg.slice(0, MAX_ERROR_LEN)
})
end

def self.deserialize!(value)
raise "invalid data type: string is required" unless value.is_a?(String)

data = JSON.parse!(value, max_nesting: 1)
new(
version: data["version"],
timestamp: data["timestamp"].to_i,
errors_count: data["errors_count"].to_i,
error_msg: data["error_msg"]
)
end
end
end
end
end
2 changes: 1 addition & 1 deletion lib/sbmt/outbox/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

module Sbmt
module Outbox
VERSION = "6.17.0"
VERSION = "6.18.0"
end
end
Loading

0 comments on commit 05be2f4

Please sign in to comment.