Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: deal with socket disposed exception #245

Merged
merged 1 commit into from
Nov 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 50 additions & 32 deletions src/Enyim.Caching/Memcached/MemcachedNode.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
using Enyim.Caching.Configuration;
using Enyim.Caching.Memcached.Protocol.Binary;
using Enyim.Caching.Memcached.Results;
using Enyim.Caching.Memcached.Results.Extensions;
using Enyim.Collections;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
Expand All @@ -12,8 +10,6 @@
using System.Net;
using System.Net.Security;
using System.Net.Sockets;
using System.Runtime.Serialization;
using System.Security;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -899,49 +895,71 @@ protected internal virtual PooledSocket CreateSocket()
{
try
{
var ps = new PooledSocket(_endPoint, _config.ConnectionTimeout, _config.ReceiveTimeout, _logger,
#if NET5_0_OR_GREATER
_useSslStream, _useIPv6, _sslClientAuthOptions);
#else
_useSslStream, _useIPv6);
#endif
ps.Connect();
return ps;
return CreateSocketInternal();
}
catch (Exception ex)
catch
{
_logger.LogError(ex, $"Create {nameof(PooledSocket)}");
throw;
try
{
return CreateSocketInternal();
}
catch (Exception ex)
{
LogCreateSocketError(ex, nameof(CreateSocket));
throw;
}
}
}

protected internal virtual async Task<PooledSocket> CreateSocketAsync()
private PooledSocket CreateSocketInternal()
{
try
{
var ps = new PooledSocket(_endPoint, _config.ConnectionTimeout, _config.ReceiveTimeout, _logger,
var ps = new PooledSocket(_endPoint, _config.ConnectionTimeout, _config.ReceiveTimeout, _logger,
#if NET5_0_OR_GREATER
_useSslStream, _useIPv6, _sslClientAuthOptions);
_useSslStream, _useIPv6, _sslClientAuthOptions);
#else
_useSslStream, _useIPv6);
_useSslStream, _useIPv6);
#endif
await ps.ConnectAsync();
return ps;
ps.Connect();
return ps;
}

protected internal virtual async Task<PooledSocket> CreateSocketAsync()
{
try
{
return await CreateSocketInternalAsync();
}
catch (Exception ex)
catch
{
var endPointStr = _endPoint.ToString().Replace("Unspecified/", string.Empty);
_logger.LogError(ex, $"Failed to {nameof(CreateSocketAsync)} to {endPointStr}");
throw;
try
{
return await CreateSocketInternalAsync();
}
catch (Exception ex)
{
LogCreateSocketError(ex, nameof(CreateSocketAsync));
throw;
}
}
}

//protected internal virtual PooledSocket CreateSocket(IPEndPoint endpoint, TimeSpan connectionTimeout, TimeSpan receiveTimeout)
//{
// PooledSocket retval = new PooledSocket(endPoint, connectionTimeout, receiveTimeout);
private async Task<PooledSocket> CreateSocketInternalAsync()
{
var ps = new PooledSocket(_endPoint, _config.ConnectionTimeout, _config.ReceiveTimeout, _logger,
#if NET5_0_OR_GREATER
_useSslStream, _useIPv6, _sslClientAuthOptions);
#else
_useSslStream, _useIPv6);
#endif
await ps.ConnectAsync();
return ps;
}

// return retval;
//}
private void LogCreateSocketError(Exception ex, string operation)
{
var endPointStr = _endPoint.ToString().Replace("Unspecified/", string.Empty);
_logger.LogError(ex, "Failed to {operation} to {EndPoint}", operation, endPointStr);
}

protected virtual IPooledSocketResult ExecuteOperation(IOperation op)
{
Expand Down
46 changes: 32 additions & 14 deletions src/Enyim.Caching/Memcached/PooledSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ public partial class PooledSocket : IDisposable
private readonly ILogger _logger;

private bool _isAlive;
private bool _useSslStream;
private bool _useIPv6;
private readonly bool _useSslStream;
private readonly bool _useIPv6;
private Socket _socket;
private bool _isSocketDisposed;
private readonly EndPoint _endpoint;
private readonly int _connectionTimeout;

Expand Down Expand Up @@ -74,7 +75,7 @@ public PooledSocket(EndPoint endpoint, TimeSpan connectionTimeout, TimeSpan rece
_socket = socket;
}

public void Connect()
public bool Connect()
{
bool success = false;

Expand All @@ -86,20 +87,21 @@ void Cancel()
{
if (_socket != null && !_socket.Connected)
{
_socket.Dispose();
_socket = null;
DisposeSocket();
}
}

cts.Token.Register(Cancel);

try
{
if (_isSocketDisposed) return false;
_socket.Connect(_endpoint);
}
catch (PlatformNotSupportedException)
{
var ep = GetIPEndPoint(_endpoint);
if (_isSocketDisposed) return false;
_socket.Connect(ep.Address, ep.Port);
}

Expand All @@ -111,8 +113,7 @@ void Cancel()
}
else
{
_socket.Dispose();
_socket = null;
DisposeSocket();
}
}

Expand All @@ -133,17 +134,24 @@ void Cancel()
{
_inputStream = new NetworkStream(_socket);
}

return true;
}
else
{
throw new TimeoutException($"Could not connect to {_endpoint}.");
}
}

public async Task ConnectAsync()
public async Task<bool> ConnectAsync()
{
bool success = false;

if (_isSocketDisposed)
{
return false;
}

try
{
var connTask = _socket.ConnectAsync(_endpoint);
Expand All @@ -156,8 +164,7 @@ public async Task ConnectAsync()
{
if (_socket != null)
{
_socket.Dispose();
_socket = null;
DisposeSocket();
}

throw new TimeoutException($"Timeout to connect to {_endpoint}.");
Expand All @@ -166,6 +173,7 @@ public async Task ConnectAsync()
catch (PlatformNotSupportedException)
{
var ep = GetIPEndPoint(_endpoint);
if (_isSocketDisposed) return false;
await _socket.ConnectAsync(ep.Address, ep.Port);
}

Expand All @@ -177,8 +185,7 @@ public async Task ConnectAsync()
}
else
{
_socket.Dispose();
_socket = null;
DisposeSocket();
}
}

Expand All @@ -199,6 +206,8 @@ await _sslStream.AuthenticateAsClientAsync(
{
_inputStream = new NetworkStream(_socket);
}

return true;
}
else
{
Expand Down Expand Up @@ -333,7 +342,7 @@ protected void Dispose(bool disposing)
}
catch (Exception e)
{
_logger.LogError(nameof(PooledSocket), e);
_logger.LogError(e, nameof(PooledSocket));
}
}
else
Expand All @@ -352,8 +361,17 @@ void IDisposable.Dispose()

private void CheckDisposed()
{
if (_socket == null)
if (_isSocketDisposed || _socket == null)
{
throw new ObjectDisposedException("PooledSocket");
}
}

private void DisposeSocket()
{
_isSocketDisposed = true;
_socket.Dispose();
_socket = null;
}

/// <summary>
Expand Down
Loading