Skip to content

Commit

Permalink
Implemented new OutputStreamAppenderResponse
Browse files Browse the repository at this point in the history
  • Loading branch information
spaghettidba committed Jan 5, 2023
1 parent 4655121 commit b3d5e6d
Show file tree
Hide file tree
Showing 6 changed files with 407 additions and 75 deletions.
194 changes: 194 additions & 0 deletions XESmartTarget.Core/Responses/OutputStreamAppenderResponse.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
using Microsoft.SqlServer.XEvent.Linq;
using NLog;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Web.Script.Serialization;
using System.Windows.Forms.Design;
using XESmartTarget.Core.Utils;

namespace XESmartTarget.Core.Responses
{
[Serializable]
public class OutputStreamAppenderResponse : Response
{
private static Logger logger = LogManager.GetCurrentClassLogger();

private static object Lock = new object();
public List<string> OutputColumns { get; set; } = new List<string>();
public string OutputMeasurement { get; set; }
public DataTableJsonAdapter.OutputFormatEnum JsonOutputFormat { get; set; }

private OutputFormatEnum _outputFormat;
public string OutputFormat {
get
{
return _outputFormat.ToString();
}
set
{
try
{
_outputFormat = (OutputFormatEnum)Enum.Parse(typeof(OutputFormatEnum), value);
}
catch
{
throw new ArgumentException($"'{value}' is not a valid OutputFormat.");
}
}
}

protected TextWriter Writer { get; set; }
private string _output;

protected virtual string Output
{
get
{
return _output;
}
set
{
_output = value;
if(_output.Equals("stderr",StringComparison.CurrentCultureIgnoreCase))
{
Writer = Console.Error;
}
else if (_output.Equals("stdout", StringComparison.CurrentCultureIgnoreCase))
{
Writer = Console.Out;
}
else if (isValidPath(_output))
{
Writer = new StreamWriter(new FileStream(_output, FileMode.Append, FileAccess.Write));
}

}
}

private bool isValidPath(string value)
{
try
{
Path.GetFullPath(value);
return Path.IsPathRooted(value);
}
catch (Exception)
{
return false;
}
}

protected enum OutputFormatEnum
{
LineProtocol,
Json,
Csv
}

protected DataTable EventsTable { get; set; } = new DataTable("events");
private XEventDataTableAdapter xeadapter;
protected Task writerTask;
private bool writerTaskStarted;

public OutputStreamAppenderResponse()
{
Output = "stdout";
logger.Info(String.Format("Initializing Response of Type '{0}'", this.GetType().FullName));
}

public override void Process(PublishedEvent evt)
{
Enqueue(evt);
}

protected void Enqueue(PublishedEvent evt)
{
if (xeadapter == null)
{
xeadapter = new XEventDataTableAdapter(EventsTable);
xeadapter.Filter = this.Filter;
xeadapter.OutputColumns = new List<OutputColumn>(OutputColumns.Select(col => new OutputColumn(col)));
}
xeadapter.ReadEvent(evt);
if (!writerTaskStarted)
{
StartWriterTask();
}
}

private void StartWriterTask()
{
if (writerTask == null)
{
writerTask = Task.Factory.StartNew(() => WriteTaskMain());
}
writerTaskStarted = true;
}

private void WriteTaskMain()
{
while (true)
{
try
{
Write();
Thread.Sleep(100);
}
catch (Exception e)
{
logger.Error("Error writing to the output stream");
logger.Error(e);
}
}
}

protected void Write()
{
lock (EventsTable)
{
lock (Lock)
{
if (_outputFormat == OutputFormatEnum.Json)
{
DataTableJsonAdapter adapter = new DataTableJsonAdapter(EventsTable)
{
OutputFormat = DataTableJsonAdapter.OutputFormatEnum.IndependentObjects,
OutputColumns = xeadapter.OutputColumns.Select(x => x.Name).ToArray()
};
if(OutputMeasurement != null)
{
adapter.StaticAttributes.Add("OutputMeasurement", OutputMeasurement);
}
adapter.WriteToStream(Writer);
}
else if(_outputFormat == OutputFormatEnum.LineProtocol)
{
DataTableLineProtocolAdapter adapter = new DataTableLineProtocolAdapter(EventsTable)
{
OutputMeasurement = OutputMeasurement
// todo: set output fields and tags
};
adapter.WriteToStream(Writer);
}
else if (_outputFormat == OutputFormatEnum.Csv)
{
DataTableCSVAdapter adapter = new DataTableCSVAdapter(EventsTable)
{
OutputColumns = xeadapter.OutputColumns.Select(x => x.Name).ToArray()
};
adapter.WriteToStream(Writer);
}
}

EventsTable.Rows.Clear();
}
}
}
}
52 changes: 31 additions & 21 deletions XESmartTarget.Core/Utils/DataTableCSVAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ namespace XESmartTarget.Core.Utils
public class DataTableCSVAdapter
{
private DataTable Table { get; set; }
private string[] OutputColumns { get; set; }
public string[] OutputColumns { get; set; }
public String OutputFile { get; set; }
public bool HeadersWritten { get; private set; }

public DataTableCSVAdapter(DataTable table)
{
Expand All @@ -32,7 +33,6 @@ public DataTableCSVAdapter(DataTable table, String outFile)
public DataTableCSVAdapter(DataTable table, String outFile, string[] outColumns)
{
OutputFile = outFile;
Table = table.DefaultView.ToTable(false,outColumns);
}

[MethodImpl(MethodImplOptions.Synchronized)]
Expand All @@ -42,31 +42,41 @@ public void WriteToFile(bool writeHeaders)
{
using (TextWriter textWriter = new StreamWriter(f))
{
var csv = new CsvWriter(textWriter, CultureInfo.CurrentCulture);
WriteToStream(textWriter);
}
}

if (writeHeaders)
{
foreach (DataColumn dc in Table.Columns)
{
csv.WriteField(dc.ColumnName);
}
csv.NextRecord();
}
}

foreach (DataRow dr in Table.Rows)
{
foreach (DataColumn dc in Table.Columns)
{
csv.WriteField(dr[dc.ColumnName]);
}
csv.NextRecord();
}
public void WriteToStream(TextWriter writer)
{
var csv = new CsvWriter(writer, CultureInfo.CurrentCulture);

csv.Flush();
if (!HeadersWritten)
{
foreach (DataColumn dc in Table.Columns)
{
csv.WriteField(dc.ColumnName);
}
csv.NextRecord();
HeadersWritten = true;
}

if(OutputColumns!= null)
{
Table = Table.DefaultView.ToTable(false, OutputColumns);
}

foreach (DataRow dr in Table.Rows)
{
foreach (DataColumn dc in Table.Columns)
{
csv.WriteField(dr[dc.ColumnName]);
}
csv.NextRecord();
}

csv.Flush();
}

}
}
112 changes: 112 additions & 0 deletions XESmartTarget.Core/Utils/DataTableJsonAdapter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
using CsvHelper;
using DouglasCrockford.JsMin;
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
using System;
using System.Collections.Generic;
using System.Data;
using System.Dynamic;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Web.Script.Serialization;

namespace XESmartTarget.Core.Utils
{
public class DataTableJsonAdapter
{
private DataTable Table;

public string[] OutputColumns { get; set; }
public Dictionary<string,object> StaticAttributes { get; set; } = new Dictionary<string,object>();
public bool Minify { get; set; }
public enum OutputFormatEnum {
SingleObject,
ObjectArray,
IndependentObjects
}

public OutputFormatEnum OutputFormat { get; set; }

public DataTableJsonAdapter(DataTable eventsTable)
{
this.Table = eventsTable;
}

public void WriteToStream(TextWriter writer)
{
var converter = new ExpandoObjectConverter();
var minifier = new JsMinifier();

if (OutputColumns != null)
{
Table = Table.DefaultView.ToTable(false, OutputColumns);
}

List<Object> outputList = null;
if(OutputFormat == OutputFormatEnum.ObjectArray)
{
outputList = new List<Object>();
}

foreach (DataRow dr in Table.Rows)
{
// create dynamic object from current row
dynamic data = new ExpandoObject();

IDictionary<string, object> outputObject = (IDictionary<string, object>)data;
foreach (DataColumn dc in Table.Columns)
{
outputObject.Add(dc.ColumnName, dr[dc.ColumnName]);
}
foreach (var k in StaticAttributes.Keys)
{
outputObject.Add(k, StaticAttributes[k]);
}

if (OutputFormat == OutputFormatEnum.ObjectArray)
{
outputList.Add(outputObject);
}
else if(OutputFormat== OutputFormatEnum.IndependentObjects)
{
string outString = JsonConvert.SerializeObject(outputObject, converter);
if (Minify)
{
outString = minifier.Minify(outString);
}
writer.WriteLine(outString);
}

}

if (OutputFormat == OutputFormatEnum.ObjectArray)
{
string outString = JsonConvert.SerializeObject(outputList.ToArray(), converter);
if (Minify)
{
outString = minifier.Minify(outString);
}
writer.WriteLine(outString);
}
else if (OutputFormat == OutputFormatEnum.SingleObject)
{
dynamic data = new ExpandoObject();
IDictionary<string, object> outputObject = (IDictionary<string, object>)data;
outputObject.Add("data", outputList);

string outString = JsonConvert.SerializeObject(outputObject, converter);
if (Minify)
{
outString = minifier.Minify(outString);
}
writer.WriteLine(outString);
}

writer.Flush();
}

}
}
Loading

0 comments on commit b3d5e6d

Please sign in to comment.