Skip to content

Commit

Permalink
Log forwarding support
Browse files Browse the repository at this point in the history
Fixes #140
  • Loading branch information
cretz committed Nov 8, 2023
1 parent 5b10051 commit cf45e50
Show file tree
Hide file tree
Showing 15 changed files with 532 additions and 42 deletions.
8 changes: 8 additions & 0 deletions src/Temporalio/Bridge/ByteArrayRef.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ public static ByteArrayRef FromUTF8(string s)
return new ByteArrayRef(StrictUTF8.GetBytes(s));
}

/// <summary>
/// Copy a byte array ref contents to a UTF8 string.
/// </summary>
/// <param name="byteArray">Byte array ref.</param>
/// <returns>String.</returns>
public static unsafe string ToUtf8(Interop.ByteArrayRef byteArray) =>
StrictUTF8.GetString(byteArray.data, (int)byteArray.size);

/// <summary>
/// Convert an enumerable set of metadata pairs to a byte array. No key or value may contain
/// a newline.
Expand Down
3 changes: 3 additions & 0 deletions src/Temporalio/Bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/Temporalio/Bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ prost-types = "0.11"
# cause non-determinism.
rand = "0.8.5"
rand_pcg = "0.3.1"
serde_json = "1.0"
temporal-client = { version = "0.1.0", path = "./sdk-core/client" }
temporal-sdk-core = { version = "0.1.0", path = "./sdk-core/core", features = ["ephemeral-server"] }
temporal-sdk-core-api = { version = "0.1.0", path = "./sdk-core/core-api" }
Expand All @@ -28,6 +29,7 @@ tokio = "1.26"
tokio-stream = "0.1"
tokio-util = "0.7"
tonic = "0.9"
tracing = "0.1"
url = "2.2"

[profile.release]
Expand Down
84 changes: 84 additions & 0 deletions src/Temporalio/Bridge/ForwardedLog.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using Microsoft.Extensions.Logging;

namespace Temporalio.Bridge
{
/// <summary>
/// Representation of log state for a Core log.
/// </summary>
/// <param name="Level">Log level.</param>
/// <param name="Target">Log target.</param>
/// <param name="Message">Log message.</param>
/// <param name="TimestampMilliseconds">Ms since Unix epoch.</param>
/// <param name="Fields">JSON fields, or null to not include.</param>
internal record ForwardedLog(
LogLevel Level,
string Target,
string Message,
ulong TimestampMilliseconds,
IDictionary<string, JsonElement>? Fields) : IReadOnlyList<KeyValuePair<string, object?>>
{
// Unfortunately DateTime.UnixEpoch not in standard library in all versions we need
private static readonly DateTime UnixEpoch = new(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);

/// <summary>
/// Gets the timestamp for this log.
/// </summary>
public DateTime Timestamp => UnixEpoch.AddMilliseconds(TimestampMilliseconds);

/// <inheritdoc />
public int Count => 5;

/// <inheritdoc />
public KeyValuePair<string, object?> this[int index]
{
get
{
switch (index)
{
case 0:
return new("Level", Level);
case 1:
return new("Target", Target);
case 2:
return new("Message", Message);
case 3:
return new("Timestamp", Timestamp);
case 4:
return new("Fields", Fields);
default:
#pragma warning disable CA2201 // We intentionally use this usually-internal-use-only exception
throw new IndexOutOfRangeException(nameof(index));
#pragma warning restore CA2201
}
}
}

/// <inheritdoc />
public IEnumerator<KeyValuePair<string, object?>> GetEnumerator()
{
for (int i = 0; i < Count; ++i)
{
yield return this[i];
}
}

/// <inheritdoc />
public override string ToString()
{
var message = $"[sdk_core::{Target}] {Message}";
if (Fields is { } fields)
{
message += " " + string.Join(", ", fields.Select(kv => $"{kv.Key}={kv.Value}"));
}
return message;
}

/// <inheritdoc />
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}
}
36 changes: 34 additions & 2 deletions src/Temporalio/Bridge/Interop/Interop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@

namespace Temporalio.Bridge.Interop
{
internal enum ForwardedLogLevel
{
Trace = 0,
Debug,
Info,
Warn,
Error,
}

internal enum MetricAttributeValueType
{
String = 1,
Expand Down Expand Up @@ -44,6 +53,10 @@ internal partial struct EphemeralServer
{
}

internal partial struct ForwardedLog
{
}

internal partial struct MetricAttributes
{
}
Expand Down Expand Up @@ -251,13 +264,16 @@ internal unsafe partial struct RuntimeOrFail
public ByteArray* fail;
}

[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
internal unsafe delegate void ForwardedLogCallback([NativeTypeName("enum ForwardedLogLevel")] ForwardedLogLevel level, [NativeTypeName("const struct ForwardedLog *")] ForwardedLog* log);

internal partial struct LoggingOptions
{
[NativeTypeName("struct ByteArrayRef")]
public ByteArrayRef filter;

[NativeTypeName("bool")]
public byte forward;
[NativeTypeName("ForwardedLogCallback")]
public IntPtr forward_to;
}

internal partial struct OpenTelemetryOptions
Expand Down Expand Up @@ -622,6 +638,22 @@ internal static unsafe partial class Methods
[DllImport("temporal_sdk_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
public static extern void byte_array_free([NativeTypeName("struct Runtime *")] Runtime* runtime, [NativeTypeName("const struct ByteArray *")] ByteArray* bytes);

[DllImport("temporal_sdk_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
[return: NativeTypeName("struct ByteArrayRef")]
public static extern ByteArrayRef forwarded_log_target([NativeTypeName("const struct ForwardedLog *")] ForwardedLog* log);

[DllImport("temporal_sdk_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
[return: NativeTypeName("struct ByteArrayRef")]
public static extern ByteArrayRef forwarded_log_message([NativeTypeName("const struct ForwardedLog *")] ForwardedLog* log);

[DllImport("temporal_sdk_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
[return: NativeTypeName("uint64_t")]
public static extern ulong forwarded_log_timestamp_millis([NativeTypeName("const struct ForwardedLog *")] ForwardedLog* log);

[DllImport("temporal_sdk_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
[return: NativeTypeName("struct ByteArrayRef")]
public static extern ByteArrayRef forwarded_log_fields_json([NativeTypeName("const struct ForwardedLog *")] ForwardedLog* log);

[DllImport("temporal_sdk_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
public static extern void ephemeral_server_start_dev_server([NativeTypeName("struct Runtime *")] Runtime* runtime, [NativeTypeName("const struct DevServerOptions *")] DevServerOptions* options, void* user_data, [NativeTypeName("EphemeralServerStartCallback")] IntPtr callback);

Expand Down
3 changes: 2 additions & 1 deletion src/Temporalio/Bridge/OptionsExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ public static unsafe Interop.LoggingOptions ToInteropOptions(
return new Interop.LoggingOptions()
{
filter = scope.ByteArray(options.Filter.FilterString),
forward = (byte)0,
// Forward callback is set in the Runtime constructor
// forward_to = ???
};
}

Expand Down
69 changes: 67 additions & 2 deletions src/Temporalio/Bridge/Runtime.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Text.Json;
using Microsoft.Extensions.Logging;

namespace Temporalio.Bridge
{
Expand All @@ -8,6 +11,12 @@ namespace Temporalio.Bridge
/// </summary>
internal class Runtime : SafeHandle
{
private static readonly Func<ForwardedLog, Exception?, string> ForwardLogMessageFormatter =
LogMessageFormatter;

private readonly ILogger? forwardToLogger;
private readonly bool forwardLoggerIncludeFields;

/// <summary>
/// Initializes a new instance of the <see cref="Runtime"/> class.
/// </summary>
Expand All @@ -20,11 +29,29 @@ public Runtime(Temporalio.Runtime.TemporalRuntimeOptions options)
{
unsafe
{
// Setup forwarding logger
IntPtr? forwardingLogCallback = null;
if (options.Telemetry.Logging?.Forwarding is { } forwarding)
{
if (forwarding.Logger == null)
{
throw new ArgumentException("Must have logger on forwarding options");
}
forwardToLogger = forwarding.Logger;
forwardLoggerIncludeFields = forwarding.IncludeFields;
forwardingLogCallback = scope.FunctionPointer<Interop.ForwardedLogCallback>(OnLog);
}

// WARNING: It is important that this options is immediately passed to new
// because we have allocated a pointer for the custom meter which can only be
// freed on the Rust side on error
var runtimeOptions = scope.Pointer(options.ToInteropOptions(scope));
var res = Interop.Methods.runtime_new(runtimeOptions);
var runtimeOptions = options.ToInteropOptions(scope);
// Set log forwarding if enabled
if (forwardingLogCallback != null)
{
runtimeOptions.telemetry->logging->forward_to = forwardingLogCallback.Value;
}
var res = Interop.Methods.runtime_new(scope.Pointer(runtimeOptions));
// If it failed, copy byte array, free runtime and byte array. Otherwise just
// return runtime.
if (res.fail != null)
Expand Down Expand Up @@ -71,5 +98,43 @@ protected override unsafe bool ReleaseHandle()
Interop.Methods.runtime_free(Ptr);
return true;
}

private static string LogMessageFormatter(ForwardedLog state, Exception? error) =>
state.ToString();

private unsafe void OnLog(Interop.ForwardedLogLevel coreLevel, Interop.ForwardedLog* coreLog)
{
if (forwardToLogger is not { } logger)
{
return;
}
// Fortunately the Core log levels integers match .NET ones
var level = (LogLevel)coreLevel;
// Go no further if not enabled
if (!logger.IsEnabled(level))
{
return;
}
// If the fields are requested, we will try to convert from JSON
IDictionary<string, JsonElement>? fields = null;
if (forwardLoggerIncludeFields)
{
var fieldsJson = ByteArrayRef.ToUtf8(Interop.Methods.forwarded_log_fields_json(coreLog));
try
{
fields = JsonSerializer.Deserialize<SortedDictionary<string, JsonElement>?>(fieldsJson);
}
catch (JsonException)
{
}
}
var log = new ForwardedLog(
Level: level,
Target: ByteArrayRef.ToUtf8(Interop.Methods.forwarded_log_target(coreLog)),
Message: ByteArrayRef.ToUtf8(Interop.Methods.forwarded_log_message(coreLog)),
TimestampMilliseconds: Interop.Methods.forwarded_log_timestamp_millis(coreLog),
Fields: fields);
logger.Log(level, 0, log, null, ForwardLogMessageFormatter);
}
}
}
29 changes: 28 additions & 1 deletion src/Temporalio/Bridge/include/temporal-sdk-bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@
#include <stdint.h>
#include <stdlib.h>

typedef enum ForwardedLogLevel {
Trace = 0,
Debug,
Info,
Warn,
Error,
} ForwardedLogLevel;

typedef enum MetricAttributeValueType {
String = 1,
Int,
Expand Down Expand Up @@ -36,6 +44,8 @@ typedef struct Client Client;

typedef struct EphemeralServer EphemeralServer;

typedef struct ForwardedLog ForwardedLog;

typedef struct MetricAttributes MetricAttributes;

typedef struct MetricInteger MetricInteger;
Expand Down Expand Up @@ -168,9 +178,18 @@ typedef struct RuntimeOrFail {
const struct ByteArray *fail;
} RuntimeOrFail;

/**
* Operations on the log can only occur within the callback, it is freed
* immediately thereafter.
*/
typedef void (*ForwardedLogCallback)(enum ForwardedLogLevel level, const struct ForwardedLog *log);

typedef struct LoggingOptions {
struct ByteArrayRef filter;
bool forward;
/**
* This callback is expected to work for the life of the runtime.
*/
ForwardedLogCallback forward_to;
} LoggingOptions;

typedef struct OpenTelemetryOptions {
Expand Down Expand Up @@ -432,6 +451,14 @@ void runtime_free(struct Runtime *runtime);

void byte_array_free(struct Runtime *runtime, const struct ByteArray *bytes);

struct ByteArrayRef forwarded_log_target(const struct ForwardedLog *log);

struct ByteArrayRef forwarded_log_message(const struct ForwardedLog *log);

uint64_t forwarded_log_timestamp_millis(const struct ForwardedLog *log);

struct ByteArrayRef forwarded_log_fields_json(const struct ForwardedLog *log);

/**
* Runtime must live as long as server. Options and user data must live through
* callback.
Expand Down
Loading

0 comments on commit cf45e50

Please sign in to comment.