Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into release/v1.19
Browse files Browse the repository at this point in the history
  • Loading branch information
haga-rak committed Feb 5, 2024
2 parents f28106e + 24a458f commit 4d76bf1
Show file tree
Hide file tree
Showing 11 changed files with 273 additions and 26 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
[![gitter](https://img.shields.io/badge/docs-latest-b36567)](https://docs.fluxzy.io/documentation/core/introduction.html)


[Features](#1-features) | [Quick usage (.NET)](#2-quick-usage) | [Quick usage (CLI)](#sample-usage) | [Documentation](https://docs.fluxzy.io/documentation/core/introduction.html) | [Build](#3-build) | [License](LICENSE.MD) | [Releases](https://github.com/haga-rak/fluxzy.core/releases)
[Features](#1-features) | [Quick usage (.NET)](#2-quick-usage) | [Quick usage (CLI)](#sample-usage) | [Documentation](https://docs.fluxzy.io/documentation/core/introduction.html) | [Build](#3-build) | [License](LICENSE.md) | [Releases](https://github.com/haga-rak/fluxzy.core/releases)

</div>

Expand Down
21 changes: 15 additions & 6 deletions src/Fluxzy.Core/Archiving/Writers/DirectoryArchiveWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,24 @@ public override void UpdateTags(IEnumerable<Tag> tags)
UpdateMeta(true);
}

public override bool Update(ExchangeInfo exchangeInfo, CancellationToken cancellationToken)
protected override bool ExchangeUpdateRequired(Exchange exchange)
{
if (_saveFilter != null && !_saveFilter.Apply(
null,
new Authority(exchangeInfo.KnownAuthority,
exchangeInfo.KnownPort,
exchangeInfo.Secure), exchangeInfo, null))
if (_saveFilter != null && !_saveFilter.Apply(null, exchange.Authority, exchange, null))
return false;

return true;
}

protected override bool ConnectionUpdateRequired(Connection connection)
{
//if (_saveFilter != null && !_saveFilter.Apply(null, connection.Authority, null, null))
// return false;

return true;
}

public override bool Update(ExchangeInfo exchangeInfo, CancellationToken cancellationToken)
{
var exchangePath = DirectoryArchiveHelper.GetExchangePath(_baseDirectory, exchangeInfo);

DirectoryArchiveHelper.CreateDirectory(exchangePath);
Expand Down
11 changes: 11 additions & 0 deletions src/Fluxzy.Core/Archiving/Writers/EventOnlyArchiveWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Collections.Generic;
using System.IO;
using System.Threading;
using Fluxzy.Core;
using Fluxzy.Misc.Streams;

namespace Fluxzy.Writers
Expand All @@ -13,6 +14,11 @@ public override void UpdateTags(IEnumerable<Tag> tags)
{
}

protected override bool ExchangeUpdateRequired(Exchange exchange)
{
return false;
}

public override bool Update(ExchangeInfo exchangeInfo, CancellationToken cancellationToken)
{
return true;
Expand All @@ -22,6 +28,11 @@ public override void Update(ConnectionInfo connectionInfo, CancellationToken can
{
}

protected override bool ConnectionUpdateRequired(Connection connection)
{
return false;
}

protected override void InternalUpdate(DownstreamErrorInfo connectionInfo, CancellationToken cancellationToken)
{

Expand Down
45 changes: 38 additions & 7 deletions src/Fluxzy.Core/Archiving/Writers/RealtimeArchiveWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,21 @@ public virtual void RegisterExchangeLimit(int? maxExchangeCount, Action onMaxExc
/// <param name="tags">The tags to be updated.</param>
public abstract void UpdateTags(IEnumerable<Tag> tags);

/// <summary>
/// Check whether calling Update(ExchangeInfo, CancellationToken) is necessary
/// </summary>
/// <param name="exchange"></param>
/// <returns></returns>
protected abstract bool ExchangeUpdateRequired(Exchange exchange);

/// <summary>
/// Check whether calling Update(ConnectionInfo, CancellationToken) is necessary
/// </summary>
/// <param name="connection"></param>
/// <returns></returns>

protected abstract bool ConnectionUpdateRequired(Connection connection);

/// <summary>
/// Updates the exchange information.
/// </summary>
Expand Down Expand Up @@ -152,20 +167,28 @@ public virtual void Update(DownstreamErrorInfo errorInfo, CancellationToken canc
ErrorUpdated(this, new DownstreamErrorEventArgs(currentCount));
}


/// <summary>
/// Updates the specified connection.
/// </summary>
/// <param name="connection">The connection to be updated.</param>
/// <param name="cancellationToken">The cancellation token to cancel the operation.</param>
public virtual void Update(Connection connection, CancellationToken cancellationToken)
{
var connectionInfo = new ConnectionInfo(connection);
ConnectionInfo ? connectionInfo = null;

Update(connectionInfo, cancellationToken);
if (ConnectionUpdateRequired(connection))
{
connectionInfo = new ConnectionInfo(connection);
Update(connectionInfo, cancellationToken);
}

// fire event
if (ConnectionUpdated != null)
{
connectionInfo ??= new ConnectionInfo(connection);
ConnectionUpdated(this, new ConnectionUpdateEventArgs(connectionInfo));
}
}

/// <summary>
Expand All @@ -178,21 +201,29 @@ public virtual void Update(
Exchange exchange, ArchiveUpdateType updateType,
CancellationToken cancellationToken)
{
var exchangeInfo = new ExchangeInfo(exchange);

if (updateType == ArchiveUpdateType.AfterResponse) {
if (updateType == ArchiveUpdateType.AfterResponse)
{
var total = Interlocked.Increment(ref InternalTotalProcessedExchanges);

if (total == _maxExchangeCount)
_onMaxExchangeCountReached?.Invoke();
}

if (!Update(exchangeInfo, cancellationToken))
return; // DO NOT fire update event when save filter is on
ExchangeInfo ? exchangeInfo = null;

if (ExchangeUpdateRequired(exchange)) {
exchangeInfo = new ExchangeInfo(exchange);

if (!Update(exchangeInfo, cancellationToken))
return; // DO NOT fire update event when save filter is on
}

// fire event
if (ExchangeUpdated != null)
{
exchangeInfo ??= new ExchangeInfo(exchange);
ExchangeUpdated(this, new ExchangeUpdateEventArgs(exchangeInfo, exchange, updateType));
}
}

protected virtual void Dispose(bool disposing)
Expand Down
12 changes: 12 additions & 0 deletions src/Fluxzy.Core/Clients/H11/Http11HeaderBlockReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ internal static class Http11HeaderBlockReader
"FLUXZY_CONNECTION_DISPOSE_DETECTION_THRESHOLD_TICKS",
TimeSpan.FromMilliseconds(1.5).Ticks);

/// <summary>
/// Keep this buffer length margin above the header size to avoid resizing the buffer
/// when adding control headers to the client
/// </summary>
private static readonly int BufferMargin =
EnvironmentUtility.GetInt32("FLUXZY_RS_BUFFER_MARGIN", 128);

private static readonly byte[] CrLf = { 0x0D, 0x0A, 0x0D, 0x0A };

/// <summary>
Expand Down Expand Up @@ -125,6 +132,11 @@ public static async ValueTask<HeaderBlockReadResult>

headerBlockReceived?.Invoke();

if ((totalRead + BufferMargin) > buffer.Buffer.Length)
{
buffer.Multiply(2);
}

return new HeaderBlockReadResult(indexFound, totalRead, false);
}
}
Expand Down
22 changes: 12 additions & 10 deletions src/Fluxzy.Core/Core/ProxyOrchestrator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,20 @@ await exchange.Context.BreakPointContext.RequestHeaderCompletion
CancellationToken.None
);

if (hasRequestBody) {

// here we have a chance substitute the requestBodyStream
if (exchange.Context.HasRequestBodySubstitution)
{
originalRequestBodyStream = hasRequestBody? exchange.Request.Body : Stream.Null;
exchange.Request.Body = await
exchange.Context.GetSubstitutedRequestBody(exchange.Request.Body!,
exchange);

if (exchange.Context.HasRequestBodySubstitution) {
originalRequestBodyStream = exchange.Request.Body;
exchange.Request.Body = await
exchange.Context.GetSubstitutedRequestBody(exchange.Request.Body!, exchange);
}
exchange.Request.Header.ForceTransferChunked();
}

if (exchange.Request.Body != null &&
(!exchange.Request.Body.CanSeek ||
exchange.Request.Body.Length > 0))
{
exchange.Request.Body = new DispatchStream(exchange.Request.Body!,
true,
_archiveWriter.CreateRequestBodyStream(exchange.Id));
Expand All @@ -190,14 +194,12 @@ await exchange.Context.BreakPointContext.RequestHeaderCompletion

connectionPool = await _poolBuilder.GetPool(exchange, _proxyRuntimeSetting, token);


if (D.EnableTracing)
{
var message = $"[#{exchange.Id}] Pool received";
D.TraceInfo(message);
}


// Actual request send

try
Expand Down
2 changes: 1 addition & 1 deletion src/Fluxzy.Core/Rules/Actions/ForwardAction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public ForwardAction(string url)
[ActionDistinctive]
public string Url { get; }

public override FilterScope ActionScope => FilterScope.RequestHeaderReceivedFromClient;
public override FilterScope ActionScope => FilterScope.ResponseHeaderReceivedFromRemote;

public override string DefaultDescription => $"Forward request to {Url}".Trim();

Expand Down
2 changes: 1 addition & 1 deletion test/Fluxzy.Tests/Cases/EarlyCloseNotify.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class EarlyCloseNotify
public async Task Run_Until_Close_Notify(string sslEngine)
{
var count = 3;
var url = "https://sandbox.smartizy.com/swagger/index.html";
var url = "https://www.example.com/";

await using var proxy = new AddHocConfigurableProxy(1, 10);

Expand Down
1 change: 1 addition & 0 deletions test/Fluxzy.Tests/Fluxzy.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
<PackageReference Include="NSubstitute" Version="5.1.0" />
<PackageReference Include="System.Threading.AccessControl" Version="8.0.0" />
<PackageReference Include="xunit" Version="2.6.2" />
<PackageReference Include="Xunit.Combinatorial" Version="1.6.24" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.5.4">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;
using Xunit;

namespace Fluxzy.Tests.UnitTests.RsBuffer
{
public class RequestProcessingBufferTests
{
[Theory]
[MemberData(nameof(GetCheckBufferLimitArgs))]
public async Task CheckAroundBufferLimit(int expectedSize)
{
var setting = FluxzySetting.CreateLocalRandomPort();
var url = "https://sandbox.smartizy.com/ip";

var dummyHeaderName = ComputeProvisionalHeaderLength(url, out var headerSize);

var count = 3;

await using var proxy = new Proxy(setting);

using var client = HttpClientUtility.CreateHttpClient(proxy.Run(), setting);

var remainingBuffer = expectedSize - headerSize;

var headerValue = new string('a', remainingBuffer);

for (int i = 0; i < count; i++) {
var requestMessage = new HttpRequestMessage(HttpMethod.Get, url);
requestMessage.Headers.TryAddWithoutValidation(dummyHeaderName, headerValue);

var response = await client.SendAsync(requestMessage);
var responseStream = await response.Content.ReadAsStreamAsync();

await responseStream.CopyToAsync(Stream.Null);

Assert.Equal(HttpStatusCode.OK, response.StatusCode);
}
}

private static string ComputeProvisionalHeaderLength(string url, out int headerSize)
{
var dummyHeaderName = "X-Padding";

headerSize = "GET HTTP/1.1\r\n\r\n\r\n".Length + "Host: \r\n".Length;

headerSize += url.Length;
headerSize += $"{dummyHeaderName}: \r\n".Length;

return dummyHeaderName;
}

public static IEnumerable<object[]> GetCheckBufferLimitArgs()
{
yield return new object[] { 500 };
yield return new object[] { 5192 };

var defaultBufferSize = FluxzySharedSetting.RequestProcessingBuffer;

var marginCount = 32;

for (int i = (defaultBufferSize - marginCount); i <= (defaultBufferSize + marginCount); i+=4) {

yield return new object[] { i };
}
}
}
}
Loading

0 comments on commit 4d76bf1

Please sign in to comment.