Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stack level too deep (SystemStackError) #79

Open
ksimmi opened this issue Dec 13, 2024 · 6 comments
Open

stack level too deep (SystemStackError) #79

ksimmi opened this issue Dec 13, 2024 · 6 comments

Comments

@ksimmi
Copy link

ksimmi commented Dec 13, 2024

Hi.

I’m not sure how to describe my issue. A few months ago, I implemented the MVP with async-websocket, and everything worked fine. However, last night, after about six hours of the program running, I encountered a new error. This is something I haven’t seen before, and I’m unsure how to address it.

/usr/local/bundle/ruby/3.2.0/gems/openssl-3.2.0/lib/openssl/buffering.rb:36:in `<<': stack level too deep (SystemStackError)
 from /usr/local/bundle/ruby/3.2.0/gems/openssl-3.2.0/lib/openssl/buffering.rb:345:in `do_write'
 from /usr/local/bundle/ruby/3.2.0/gems/openssl-3.2.0/lib/openssl/buffering.rb:471:in `flush'
 from /usr/local/bundle/ruby/3.2.0/gems/openssl-3.2.0/lib/openssl/buffering.rb:411:in `write_nonblock'
 from /usr/local/bundle/ruby/3.2.0/gems/io-stream-0.6.1/lib/io/stream/buffered.rb:93:in `syswrite'
 from /usr/local/bundle/ruby/3.2.0/gems/io-stream-0.6.1/lib/io/stream/generic.rb:184:in `drain'
 from /usr/local/bundle/ruby/3.2.0/gems/io-stream-0.6.1/lib/io/stream/generic.rb:196:in `block in flush'
 from /usr/local/bundle/ruby/3.2.0/gems/io-stream-0.6.1/lib/io/stream/generic.rb:195:in `synchronize'
 from /usr/local/bundle/ruby/3.2.0/gems/io-stream-0.6.1/lib/io/stream/generic.rb:195:in `flush'
  ... 954 levels...
 from client.rb:78:in `keep_alive'
 from client.rb:68:in `block in run'
 from /usr/local/bundle/ruby/3.2.0/gems/async-2.21.1/lib/async/task.rb:197:in `block in run'
 from /usr/local/bundle/ruby/3.2.0/gems/async-2.21.1/lib/async/task.rb:435:in `block in schedule'

Thank you.

@ioquatix
Copy link
Member

Thanks for the report. Let me see if I can figure out what's going on.

@ioquatix
Copy link
Member

Are you able to share the code in client.rb?

@ksimmi
Copy link
Author

ksimmi commented Dec 13, 2024

client.rb

# frozen_string_literal: true

require 'bundler/setup'
require 'active_support'
require 'active_support/core_ext'
require 'async'
require 'async/http/endpoint'
require 'async/websocket/client'
require 'karafka'

require_relative 'settings'

require_relative 'callable'
require_relative 'api_interactor'
require_relative 'api_interactor/http'
require_relative 'api_interactor/token_manager'

require 'pry'

MessagePack::DefaultFactory.register_type(
  MessagePack::Timestamp::TYPE, # or just -1
  Time,
  packer: MessagePack::Time::Packer,
  unpacker: MessagePack::Time::Unpacker
)


# class KafkaApp < Karafka::App
#   setup do |config|
#     config.client_id = 'my_application'
#     # librdkafka configuration options need to be set as symbol values
#     config.kafka = {
#       'bootstrap.servers': Settings.kafka_url
#     }
#   end
# end

PRODUCER = WaterDrop::Producer.new do |config|
  config.deliver = true
  config.kafka = {
    'bootstrap.servers': Settings.kafka_url,
    'enable.idempotence': true
  }
end

class MyClient

  def initialize
    @reconnection_attempts = 0
  end

  def run
    # get_sports

    WS.connect

    Async do
      WS.listen
    end

    Async do
      clear_expired
    end

    Async do
      WS.set_settings
      WS.subscribe
      keep_alive
    end

  rescue WS::ConnectionError => e
    @reconnection_attempts += 1
    Console.logger.warn(self, "reconnection attempts cause `Invalid Token`: #{@reconnection_attempts}")
    retry
  end

  def keep_alive
    loop do
      WS.echo
      @hits ||= 0
      @hits += 1
      PRODUCER.produce_async(topic: 'pong', payload: { hits: @hits }.to_json)

      sleep(1)
    end
  end

  def clear_expired
    loop do
      WS.clear_expired
      sleep(10)
    end
  end
end


Async do |task|
  MyClient.new.run
end

@ioquatix
Copy link
Member

ioquatix commented Dec 14, 2024

Are you able to tell me what is WS? And what is the implementation of WS.echo?

@ksimmi
Copy link
Author

ksimmi commented Dec 16, 2024

class WS
  ConnectionError = Class.new(StandardError)

  class << self

    def token_manager
      @token_manager ||= ApiInteractor::TokenManager.new(Settings.ws_identity_srv_url)
    end

    def connect
      @queue = TTLHash.new

      @connection = Async::WebSocket::Client.connect(
        Async::HTTP::Endpoint.parse(
          Settings.ws_api_url_url,
          alpn_protocols: Async::HTTP::Protocol::HTTP11.names
        ),
        headers: { Authorization: token_manager.fetch }
      )
      rescue Async::WebSocket::ConnectionError => e
        Console.logger.error(e)
        Console.logger.error(e.response) if e.response

        raise ConnectionError
    end

    def set_settings
      set_settings_response = call_command(SetSettings)
      get_settings_response = call_command(GetSettings)
    end

    def subscribe
      subscribe_response = call_command(Subscribe)
      get_subscriptions_response = call_command(GetSubscriptions)
      # unsubscribe_response = call_command(Unsubscribe)
    end

    def echo
      call_command(Echo)
    end

    def clear_expired
      @queue.clear_expired
    end

    def call_command(cmd)
      msg = cmd.to_message

      @connection.write(msg.body)
      @connection.flush

      @queue.set(msg.message_id, ttl: cmd.await_time_secs)

      debug_info = {
        cmd_name: cmd.usecase_name,
        bytes: msg.body,
        payload: msg.payload,
        hex: msg.debug_as_hex
      }

      # binding.pry if cmd == GetSettings

      while @queue.queued?(msg.message_id)
        Console.logger.info("#{cmd.usecase_name}[#{cmd.dgn_api_code}] message awaiting #{msg.message_id}")

        if @queue.expired?(msg.message_id)
          Console.logger.info("#{cmd.usecase_name}[#{cmd.dgn_api_code}] message expired  #{msg.message_id}")

          @queue.dequeue(msg.message_id)
          call_command(cmd)
        end

        if response = @queue.get(msg.message_id)
          if response&.message
            debug_info.merge!(response: response.message)
          end

          Console.logger.info("#{cmd.usecase_name}[#{cmd.dgn_api_code}] message handled  #{msg.message_id}")
          return @queue.dequeue(msg.message_id)
        end

        sleep(cmd.await_time_secs)
      end

    rescue IOError => e
      Console.logger.error("[WS] #{e.class} #{e.message}")
      raise e

    rescue => e
      binding.pry
    end

    def listen
      @results = {}
      @file_counts = 0

      while message = @connection.read
        msg = InboxHandler.new(Message.unwrap(message.buffer))

        case
        when msg.command_response?
          @queue.set(msg.message_id, msg)

        when msg.broadcast?
          handle_broadcast(msg)

        else
          Console.logger.info("UNHANDLED MESSAGE TYPE RT=#{msg.rt_code} #{msg.type} #{msg} #{msg&.data}")
          # raise "UNHANDLED MESSAGE #{msg} #{msg&.data}"
        end
      end
    rescue => e
      # binding.pry
    end

    def handle_broadcast(msg)
      case
      when msg.matches_broadcast?
        data = WS::Broadcast::Match.handle(msg.data)

        @match_id ||= data[:id]

        Console.logger.info("#{msg.type} RT=#{msg.rt_code} MATCH_ID=#{@match_id}") if @match_id == data[:id]

      when msg.stakes_broadcast?
        data = WS::Broadcast::Stake.handle(msg.data)

        @match_id = nil
        @match_id ||= data.first[:event_id]

        Console.logger.info("#{msg.type} RT=#{msg.rt_code} MATCH_ID=#{@match_id}") if @match_id == data.first[:event_id]

      when msg.sports_broadcast?
        WS::Broadcast::Sport.handle(msg.data)
        Console.logger.info("#{msg.type} RT=#{msg.rt_code}")

      when msg.championship_broadcast?
        WS::Broadcast::Championship.handle(msg.data)
        Console.logger.info("#{msg.type} RT=#{msg.rt_code}")

      when msg.tournaments_broadcast?
        WS::Broadcast::Tournament.handle(msg.data)
        Console.logger.info("#{msg.type} RT=#{msg.rt_code}")

      else
        Console.logger.info("UNHANDLED BROADCAST TYPE RT=#{msg.rt_code} #{msg.type} #{msg} #{msg&.data}")
        # raise "UNHANDLED BROADCAST TYPE RT=#{msg.rt_code} #{msg} #{msg&.data}"
      end

      rescue => e
        # binding.pry
      end
  end
end

@ksimmi
Copy link
Author

ksimmi commented Jan 17, 2025

The problem occurs intermittently.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants