From 7226612a0f6294b6fed3af0668b08d0b48ae9155 Mon Sep 17 00:00:00 2001 From: Stanislav Muhametsin <346799+stazz@users.noreply.github.com> Date: Tue, 19 Mar 2019 23:45:50 +0200 Subject: [PATCH] Documenting the UtilPack.ProcessMonitor project. --- .../UtilPack.ProcessMonitor/ProcessMonitor.cs | 502 +++++++++++------- .../UtilPack.ProcessMonitor.csproj | 2 +- 2 files changed, 300 insertions(+), 204 deletions(-) diff --git a/Source/Code/UtilPack.ProcessMonitor/ProcessMonitor.cs b/Source/Code/UtilPack.ProcessMonitor/ProcessMonitor.cs index 6cd2d33..4ac5350 100644 --- a/Source/Code/UtilPack.ProcessMonitor/ProcessMonitor.cs +++ b/Source/Code/UtilPack.ProcessMonitor/ProcessMonitor.cs @@ -39,26 +39,284 @@ namespace UtilPack.ProcessMonitor { + /// + /// This static class provides methods for executing processes while keeping an eye on given . + /// + /// + /// The invokable process is assumed to implement graceful shutdown whenever semaphore with given name detects cancellation signal from this process. + /// See and for more details about implementing such functionality in the invoked process. + /// + public static class ProcessMonitorWithGracefulCancelability + { + /// + /// The default maximum time to wait for process to gracefully terminate after the cancellation token is canceled. + /// The value is 1 second. + /// + public static TimeSpan DefaultShutdownSemaphoreWaitTime = TimeSpan.FromSeconds( 1 ); + + /// + /// Asynchronously starts process at given path, optionally writes input to it, then waits for it to exit while keeping an eye for given , and invokes given deserialization callback on contents of standard output, if the execution was successful. + /// + /// The deserialized type of standard output. + /// The path to the process executable. + /// The parameters for the process. + /// The name of the semaphore used to signal graceful shutdown after cancellation. Do not use "Global\" prefix. + /// The callback to deserialize from standard output, in case process returned successfully. + /// The to use to check on cancellation. + /// The optional callback to write input to the process. If no value is specified, the process will not have standard input. + /// The maximum time to wait for process to gracefully terminate after the cancellation token is canceled. By default, is value of . + /// Asynchronously returns either deserialized instance of , or error string. The error string will be null if given is canceled, otherwise it will be either contents of standard error, or fixed error message. + public static async Task> CallProcessAndGetResultAsync( + String processPath, + String +#if !NET40 && !NET45 && !NETSTANDARD2_0 && !NETCOREAPP1_1 && !NETCOREAPP2_0 + [] +#endif + processArguments, + String shutdownSemaphoreName, + Func outputDeserializer, + CancellationToken token, + Func inputWriter = null, + TimeSpan? shutdownSemaphoreMaxWaitTime = default + ) + { + + (var stdinWriter, var stdinSuccess) = inputWriter == null ? default : GetStdInWriter( inputWriter ); + (var exitCode, var stdout, var stderr) = await CallProcessAndCollectOutputToString( + processPath, + processArguments, + shutdownSemaphoreName, + token, + stdinWriter: stdinWriter, + shutdownSemaphoreMaxWaitTime: shutdownSemaphoreMaxWaitTime + ); + + String GetErrorString() + { + var errorString = stderr.ToString(); + return exitCode.HasValue ? + ( String.IsNullOrEmpty( errorString ) ? ( exitCode == 0 ? "Unspecified error" : $"Non-zero return code of {processPath}" ) : errorString ) + : null; + } + + return stderr.Length > 0 || !CheckStdInSuccess( stdinSuccess ) || !exitCode.HasValue || exitCode.Value != 0 ? + new EitherOr( GetErrorString() ) : + outputDeserializer( stdout ); + } + + // /// + // /// Asynchronously starts process at given path, optionally writes input to it, then waits for it to exit while keeping an eye for given , and streaming standard output and error streams. + // /// + // /// The path to the process executable. + // /// The parameters for the process. + // /// The name of the semaphore used to signal graceful shutdown after cancellation. Do not use "Global\" prefix. + // /// The callback invoked on each event of . The tuple first item is the string, the tuple second item is true if the string originates from error stream, and third item is the UTC when it was received by . + // /// The to use to check on cancellation. + // /// The optional callback to write input to the process. If no value is specified, the process will not have standard input. + // /// The maximum time to wait for process to gracefully terminate after the cancellation token is canceled. By default, is value of . + // /// Asynchronously returns the process exit code. Will return null if given is canceled. + // /// If the given was specified, but did not complete successfully. + // public static async Task CallProcessAndStreamOutputAsync( + // String processPath, + // String + //#if !NET40 && !NET45 && !NETSTANDARD2_0 && !NETCOREAPP1_1 && !NETCOREAPP2_0 + // [] + //#endif + // processArguments, + // String shutdownSemaphoreName, + // Func<(String Data, Boolean IsError, DateTime Timestamp), Task> onStdOutOrErrLine, + // CancellationToken token, + // Func inputWriter = null, + // TimeSpan? shutdownSemaphoreMaxWaitTime = default + // ) + // { + // (var stdinWriter, var stdinSuccess) = inputWriter == null ? default : GetStdInWriter( inputWriter ); + // var returnCode = await processPath.CallProcessWithRedirects( + // processArguments, + // shutdownSemaphoreName, + // token, + // stdinWriter: stdinWriter, + // shutdownSemaphoreMaxWaitTime: shutdownSemaphoreMaxWaitTime, + // onStdOutOrErrLine: onStdOutOrErrLine + // ); + + // return CheckStdInSuccess( stdinSuccess ) ? + // returnCode : + // throw new InvalidOperationException( "Standard input writer did not complete successfully." ); + // } + + private static Boolean CheckStdInSuccess( + Func stdInSuccess + ) + { + return stdInSuccess == null || stdInSuccess(); + } + + private static (Func StdInWriter, Func StdInSuccess) GetStdInWriter( + Func inputWriter + ) + { + var stdinSuccess = false; + return ( + async stdin => + { + try + { + await inputWriter( stdin ); + stdinSuccess = true; + } + catch + { + // Ignore + } + } + , + () => stdinSuccess + ); + } + + + + /// + /// Asynchronously starts process at given path, optionally writes input to it, then waits for it to exit while keeping an eye for given , and streaming standard output and error streams. + /// + /// The path to the process executable. + /// The parameters for the process. + /// The name of the semaphore used to signal graceful shutdown after cancellation. Do not use "Global\" prefix. + /// The to use to check on cancellation. + /// The optional callback to write input to the process. If no value is specified, the process will not have standard input. + /// The maximum time to wait for process to gracefully terminate after the cancellation token is canceled. By default, is value of . + /// Asynchronously returns the process exit code. Will return null if given is canceled. Along with the exit code are also returned standard output and error instances. + public static async Task<(Int32? ReturnCode, StringBuilder StdOut, StringBuilder StdErr)> CallProcessAndCollectOutputToString( + String processPath, + String +#if !NET40 && !NET45 && !NETSTANDARD2_0 && !NETCOREAPP1_1 && !NETCOREAPP2_0 + [] +#endif + processArguments, + String shutdownSemaphoreName, + CancellationToken token, + Func stdinWriter = null, + TimeSpan? shutdownSemaphoreMaxWaitTime = null + ) + { + var stdout = new StringBuilder(); + var stderr = new StringBuilder(); + var retVal = await CallProcessWithRedirects( + processPath, + processArguments, + shutdownSemaphoreName, + token, + stdinWriter: stdinWriter, + shutdownSemaphoreMaxWaitTime: shutdownSemaphoreMaxWaitTime, + onStdOutOrErrLine: tuple => + { + (var line, var isError, var timestamp) = tuple; + ( isError ? stderr : stdout ).Append( line ).Append( '\n' ); + return null; + } ); + return (retVal, stdout, stderr); + } + + + /// + /// Asynchronously starts process at given path, optionally writes input to it, then waits for it to exit while keeping an eye for given , and streaming standard output and error streams. + /// + /// The path to the process executable. + /// The parameters for the process. + /// The name of the semaphore used to signal graceful shutdown after cancellation. Do not use "Global\" prefix. + /// The to use to check on cancellation. + /// The optional callback to write input to the process. If no value is specified, the process will not have standard input. + /// The maximum time to wait for process to gracefully terminate after the cancellation token is canceled. By default, is value of . + /// Asynchronously returns the process exit code. Will return null if given is canceled. + public static async Task CallProcessWithRedirects( + String processPath, + String +#if !NET40 && !NET45 && !NETSTANDARD2_0 && !NETCOREAPP1_1 && !NETCOREAPP2_0 + [] +#endif + processArguments, + String shutdownSemaphoreName, + CancellationToken token, + Func stdinWriter = null, + TimeSpan? shutdownSemaphoreMaxWaitTime = null, + Func<(String Data, Boolean IsError, DateTime Timestamp), Task> onStdOutOrErrLine = null + ) + { + var processOutput = new ConcurrentQueue<(String Data, Boolean IsError, DateTime Timestamp)>(); + try + { + return await ProcessUtils.StartAndWaitForExitAsync( + ProcessUtils.CreateProcess( + processPath, + processArguments, + onStdOutLine: outLine => processOutput.Enqueue( (outLine, false, DateTime.UtcNow) ), + onStdErrLine: errLine => processOutput.Enqueue( (errLine, true, DateTime.UtcNow) ) ), + shutdownSemaphoreName, + token, + stdinWriter: stdinWriter, + onTick: async () => await ProcessOutput( processOutput, onStdOutOrErrLine ), + shutdownSemaphoreMaxWaitTime: shutdownSemaphoreMaxWaitTime + ); + } + finally + { + // Flush any 'leftover' messages + await ProcessOutput( processOutput, onStdOutOrErrLine ); + } + } + + private static async Task ProcessOutput( + ConcurrentQueue<(String Data, Boolean IsError, DateTime Timestamp)> processOutput, + Func<(String Data, Boolean IsError, DateTime Timestamp), Task> onStdOutOrErrLine + ) + { + if ( onStdOutOrErrLine == null ) + { + onStdOutOrErrLine = ( tuple ) => ( tuple.IsError ? Console.Error : Console.Out ).WriteLineAsync( tuple.Data ); + } + + while ( processOutput.TryDequeue( out var output ) ) + { + var t = onStdOutOrErrLine( output ); + if ( t != null ) + { + await t; + } + } + } + + } + /// + /// This class is mainly for internal usage, but is exposed in case some functionality is needed from other libraries. + /// public static class ProcessUtils { - + /// + /// Creates a new instance of but doesn't start it. + /// + /// The path to the process executable. + /// The parameters for the process. + /// The optional callback to invoke when standard output has been received. + /// The optional callback to invoke when standard error output has been received. + /// public static Process CreateProcess( - String fileName, + String processPath, String #if !NET40 && !NET45 && !NETSTANDARD2_0 && !NETCOREAPP1_1 && !NETCOREAPP2_0 [] #endif - arguments, + processArguments, Action onStdOutLine = null, Action onStdErrLine = null ) { var startInfo = new ProcessStartInfo() { - FileName = fileName, + FileName = processPath, #if NET40 || NET45 || NETSTANDARD2_0 || NETCOREAPP1_1 || NETCOREAPP2_0 - Arguments = arguments, + Arguments = processArguments, #endif UseShellExecute = false, CreateNoWindow = true, @@ -66,7 +324,7 @@ public static Process CreateProcess( RedirectStandardError = onStdErrLine != null, }; #if !NET40 && !NET45 && !NETSTANDARD2_0 && !NETCOREAPP1_1 && !NETCOREAPP2_0 - foreach ( var arg in arguments ) + foreach ( var arg in processArguments ) { startInfo.ArgumentList.Add( arg ); } @@ -100,6 +358,12 @@ public static Process CreateProcess( return p; } + /// + /// Starts this process and asynchronously writes data to standard input, if parameter is specified. + /// + /// The . + /// Optional callback to write to input. + /// Asynchronously returns void. public static async Task StartProcessAsync( Process p, Func stdinWriter = null @@ -111,7 +375,6 @@ public static async Task StartProcessAsync( p.BeginOutputReadLine(); p.BeginErrorReadLine(); - // Pass serialized configuration via stdin if ( redirectStdIn ) { using ( var stdin = p.StandardInput ) @@ -119,117 +382,34 @@ public static async Task StartProcessAsync( await stdinWriter( stdin ); } } - } - } - - public static class ProcessMonitorWithGracefulCancelability - { - public static TimeSpan DefaultShutdownSemaphoreWaitTime = TimeSpan.FromSeconds( 1 ); - - public static async Task> CallProcessAndGetResultAsync( - String processPath, - String shutdownSemaphoreName, - String -#if !NET40 && !NET45 && !NETSTANDARD2_0 && !NETCOREAPP1_1 && !NETCOREAPP2_0 - [] -#endif - processArguments, - Func inputWriter, - Func outputDeserializer, - CancellationToken token, - TimeSpan? shutdownSemaphoreWaitTime = default - ) - { - - (var stdinWriter, var stdinSuccess) = GetStdInWriter( inputWriter ); - (var exitCode, var stdout, var stderr) = await processPath.ExecuteAsFileAtThisPathWithCancelabilityCollectingOutputToString( - processArguments, - token, - shutdownSemaphoreName, - shutdownSemaphoreWaitTime ?? DefaultShutdownSemaphoreWaitTime, - stdinWriter: stdinWriter - ); - - String GetErrorString() - { - var errorString = stderr.ToString(); - return String.IsNullOrEmpty( errorString ) ? - ( exitCode == 0 ? "Unspecified error" : $"Non-zero return code of {processPath}" ) : - errorString; - } - - return stderr.Length > 0 || !stdinSuccess() || exitCode != 0 ? - new EitherOr( GetErrorString() ) : - outputDeserializer( stdout ); - } - - public static async Task CallProcessAndStreamOutputAsync( - String processPath, - String shutdownSemaphoreName, - String -#if !NET40 && !NET45 && !NETSTANDARD2_0 && !NETCOREAPP1_1 && !NETCOREAPP2_0 - [] -#endif - processArguments, - Func inputWriter, - Func onStdOutOrErrLine, - CancellationToken token, - TimeSpan? shutdownSemaphoreWaitTime = default - ) - { - (var stdinWriter, var stdinSuccess) = GetStdInWriter( inputWriter ); - var returnCode = await processPath.ExecuteAsFileAtThisPathWithCancelabilityAndRedirects( - processArguments, - token, - shutdownSemaphoreName, - shutdownSemaphoreWaitTime ?? DefaultShutdownSemaphoreWaitTime, - stdinWriter: stdinWriter, - onStdOutOrErrLine: onStdOutOrErrLine - ); - - return stdinSuccess() ? - returnCode : - default; } - private static (Func StdInWriter, Func StdInSuccess) GetStdInWriter( - Func inputWriter - ) - { - var stdinSuccess = false; - return ( - async stdin => - { - try - { - await inputWriter( stdin ); - stdinSuccess = true; - } - catch - { - // Ignore - } - } - , - () => stdinSuccess - ); - } - - + /// + /// This utility method will start the given and then will wait for it to complete, while keeping an eye for the given . + /// + /// This . + /// The name of the semaphore used to signal graceful shutdown after cancellation. Do not use "Global\" prefix. + /// The to use. + /// The optional callback to write input to the process. If no value is specified, the process will not have standard input. + /// The maximum time to wait for process to gracefully terminate after the cancellation token is canceled. By default, is value of . + /// The optional callback to perform some action between polling for process state. + /// Asynchronously returns the process exit code. Will return null if given is canceled. + /// + /// Typically the to start is created by . + /// public static async Task StartAndWaitForExitAsync( - this Process process, // Typically created by ProcessMonitor.CreateProcess(), just don't start it! - // String shutdownSemaphoreArgumentName, - CancellationToken token, + Process process, String shutdownSemaphoreName, - TimeSpan shutdownSemaphoreMaxWaitTime, - // Boolean cancelabilityIsOptional = false, + CancellationToken token, Func stdinWriter = null, + TimeSpan? shutdownSemaphoreMaxWaitTime = null, Func onTick = null ) { Int32? exitCode = null; + var maxTime = shutdownSemaphoreMaxWaitTime ?? ProcessMonitorWithGracefulCancelability.DefaultShutdownSemaphoreWaitTime; using ( var shutdownSemaphore = token.CanBeCanceled ? ShutdownSemaphoreFactory.CreateSignaller( shutdownSemaphoreName ) : default ) - using ( process ) // var process = ProcessMonitor.CreateProcess( fileName, arguments, onStdOutLine, onStdErrLine ) ) + using ( process ) { process.EnableRaisingEvents = true; @@ -295,7 +475,7 @@ void OnCancel() // Now, check if restart semaphore has been signalled // restart = restartSemaphore != null && restartSemaphore.WaitOne( 0 ); } - else if ( shutdownSignalledTime.HasValue && DateTime.UtcNow - shutdownSignalledTime.Value > shutdownSemaphoreMaxWaitTime ) + else if ( shutdownSignalledTime.HasValue && DateTime.UtcNow - shutdownSignalledTime.Value > maxTime ) { // We have signalled shutdown, but process has not exited in time try @@ -324,13 +504,16 @@ void OnCancel() } } - try - { - exitCode = process.ExitCode; - } - catch - { - // Ignore + if ( !token.IsCancellationRequested ) + { + try + { + exitCode = process.ExitCode; + } + catch + { + // Ignore + } } } @@ -338,92 +521,5 @@ void OnCancel() return exitCode; } - - public static async Task<(Int32? ReturnCode, StringBuilder StdOut, StringBuilder StdErr)> ExecuteAsFileAtThisPathWithCancelabilityCollectingOutputToString( - this String fileName, - String -#if !NET40 && !NET45 && !NETSTANDARD2_0 && !NETCOREAPP1_1 && !NETCOREAPP2_0 - [] -#endif - arguments, - CancellationToken token, - String shutdownSemaphoreName, - TimeSpan shutdownSemaphoreMaxWaitTime, - Func stdinWriter = null - ) - { - var stdout = new StringBuilder(); - var stderr = new StringBuilder(); - var retVal = await fileName.ExecuteAsFileAtThisPathWithCancelabilityAndRedirects( - arguments, - token, - shutdownSemaphoreName, - shutdownSemaphoreMaxWaitTime, - stdinWriter: stdinWriter, - onStdOutOrErrLine: ( line, isError ) => - { - ( isError ? stderr : stdout ).Append( line ).Append( '\n' ); - return null; - } ); - return (retVal, stdout, stderr); - } - - public static async Task ExecuteAsFileAtThisPathWithCancelabilityAndRedirects( - this String fileName, - String -#if !NET40 && !NET45 && !NETSTANDARD2_0 && !NETCOREAPP1_1 && !NETCOREAPP2_0 - [] -#endif - arguments, - CancellationToken token, - String shutdownSemaphoreName, - TimeSpan shutdownSemaphoreMaxWaitTime, - Func stdinWriter = null, - Func onStdOutOrErrLine = null - ) - { - var processOutput = new ConcurrentQueue<(Boolean IsError, DateTime Timestamp, String Data)>(); - try - { - return await ProcessUtils.CreateProcess( - fileName, - arguments, - onStdOutLine: outLine => processOutput.Enqueue( (false, DateTime.UtcNow, outLine) ), - onStdErrLine: errLine => processOutput.Enqueue( (true, DateTime.UtcNow, errLine) ) ) - .StartAndWaitForExitAsync( - token, - shutdownSemaphoreName, - shutdownSemaphoreMaxWaitTime, - stdinWriter: stdinWriter, - onTick: async () => await ProcessOutput( processOutput, onStdOutOrErrLine ) - ); - } - finally - { - // Flush any 'leftover' messages - await ProcessOutput( processOutput, onStdOutOrErrLine ); - } - } - - private static async Task ProcessOutput( - ConcurrentQueue<(Boolean IsError, DateTime Timestamp, String Data)> processOutput, - Func onStdOutOrErrLine - ) - { - if ( onStdOutOrErrLine == null ) - { - onStdOutOrErrLine = ( line, isError ) => ( isError ? Console.Error : Console.Out ).WriteLineAsync( line ); - } - - while ( processOutput.TryDequeue( out var output ) ) - { - var t = onStdOutOrErrLine( output.Data, output.IsError ); - if ( t != null ) - { - await t; - } - } - } - } } \ No newline at end of file diff --git a/Source/Code/UtilPack.ProcessMonitor/UtilPack.ProcessMonitor.csproj b/Source/Code/UtilPack.ProcessMonitor/UtilPack.ProcessMonitor.csproj index ed37fc1..6e8605b 100644 --- a/Source/Code/UtilPack.ProcessMonitor/UtilPack.ProcessMonitor.csproj +++ b/Source/Code/UtilPack.ProcessMonitor/UtilPack.ProcessMonitor.csproj @@ -22,7 +22,7 @@ 1.0.0 $(AssemblyName) - Easy-to-use API for spawning processes and waiting for their exit while keeping an eye on CancellationToken. + Easy-to-use API for spawning processes and waiting for their exit while keeping an eye on CancellationToken. Typically, extension and utitily methods of ProcessMonitorWithGracefulCancelability class from this package are used.