Skip to content

Commit

Permalink
Update udp.py
Browse files Browse the repository at this point in the history
Added additional error handling for udp streams, and automatic retry for endpoint connections
  • Loading branch information
skhademcis authored Oct 24, 2023
1 parent 1e1e246 commit cb4477b
Showing 1 changed file with 31 additions and 7 deletions.
38 changes: 31 additions & 7 deletions estreamer/streams/udp.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#*********************************************************************/

import socket
import time
import estreamer.crossprocesslogging as logging
from estreamer.common.convert import isInt
from estreamer.streams.base import Base

Expand All @@ -38,26 +40,48 @@ def __init__( self, host, port, encoding = 'utf-8' ):
self.encoding = encoding
self.socket = None


self.logger = logging.getLogger( self.__class__.__name__ )

def __connect( self ):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
self.socket.connect( ( self.host, self.port) )


while True :
try:

self.logger.debug('Connecting to {0}:{1}'.format(self.host, self.port ))

self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
self.socket.connect((self.host,self.port))
break

except OSError as ose :

self.logger.error( "Socket Connection Error [{2}] - Cannot connect to host {0}:{1} - Retrying ...".format(self.host,self.port, ose))
time.sleep(2)

def close( self ):
try:
self.socket.shutdown( socket.SHUT_RDWR )
self.socket.close()
self.socket = None

except AttributeError:
pass

def write( self, data ):

while True :
if self.socket is None:
self.__connect()

else :
try:
self.logger.debug('Sending {2} to {0}:{1}'.format(self.host, self.port , data))
self.socket.sendall( data.encode( self.encoding ) )
break

def write( self, data ):
if self.socket is None:
self.__connect()
except OSError as ex:
self.logger.error("Error [{0}] writing to endpoint {1}:{2} -- Retrying...".format(ex, self.host, self.port))
time.sleep(1)
self.socket = None

self.socket.send( data.encode( self.encoding ) )

0 comments on commit cb4477b

Please sign in to comment.