The programming model for asynchronous I/O is an alien beast. You call BeginRead, specifying a callback function to be invoked asynchronously when the read operation completes (or the stream is closed). But you're not given the actual data -- for that, you have to call back into the stream's EndRead method, and then reestablish the async read by calling BeginRead, again. It's really quite a chore, because you have to have cached that Stream object somewhere... Uncle!
Doesn't .NET have a nice, tidy event model? Yes it does. Can't we just subscribe to receive the data as it comes in, without writing two pages of boilerplate code juggling streams, buffers, and IAsyncResult references? Yes we can.
Submitted for your approval: ASyncStreamPump, my attempt at a reusable solution for this problem... useful for large file copies, downloads, uploads -- can even be used to proxy a network connection, with just a few lines of client code!
using System;
using System.IO;
using System.Threading;
namespace Jitsu.IO
{
/// <summary>
/// Fits two streams together -- data received at one end is sent down the other.
/// </summary>
/// <remarks>
/// The data flow is in one direction only: from Inbound to Outbound. A second
/// AsyncStreamPump instance can be attached to the same two streams, polarity
/// reversed, to facilitate bidirectional data flow -- e.g. for proxying a network
/// protocol.
/// <para />
/// The inbound stream must support asynchronous operations. Else, the transfer
/// will take place synchronously (looping recursively, on the callback method!)
/// which is very inefficient -- even prone to stack overflow exceptions for large
/// transfers (or small buffers).
/// </remarks>
public class AsyncStreamPump
{
#region Initialization
/// <summary>Initializes a new instance of the AsyncStreamPump class.</summary>
public AsyncStreamPump(int buffersize)
{
this.buffersize = buffersize;
this.buffer = new byte[buffersize];
this.donesignal = new ManualResetEvent(false);
this.done = false;
}
public AsyncStreamPump() : this(8192)
{ }
#endregion
#region Properties
/// <summary>Gets or sets the source stream.</summary>
public Stream InboundStream
{
get
{ return inbound; }
set
{
// If we're not async, our callback regime will be horribly inefficient, even
// prone to stack overflows (sync behavior is simulated by invoking our async
// method recursively). FileStream allows us to check this. Too bad this
// property isn't higher up in the class hierarchy.
if (value is FileStream && !((FileStream)value).IsAsync)
throw new IOException("Inbound stream must support asynchronous access.");
inbound = value;
}
}
private Stream inbound;
/// <summary>Gets or sets the destination stream.</summary>
public Stream OutboundStream
{
get
{ return outbound; }
set
{ outbound = value; }
}
private Stream outbound;
/// <summary>Enables or disables automatically flushing each write to the
/// outbound stream.</summary>
public bool AutoFlush
{
get
{ return autoflush; }
set
{ autoflush = value; }
}
private bool autoflush;
/// <summary>Determines whether the pump has reached the end of the source (inbound)
/// stream.</summary>
public bool IsDone
{
get
{ return done; }
}
private bool done;
/// <summary>Gets a waitable object which will be signaled when the pump is
/// finished.</summary>
public WaitHandle DoneSignal
{
get
{ return donesignal; }
}
private ManualResetEvent donesignal;
#endregion
#region Methods
/// <summary>Start the pump, by issuing the first BeginRead call on the source
/// (inbound) stream.</summary>
public void Start()
{
inbound.BeginRead(buffer, 0, buffersize, new AsyncCallback(OnDataReceived), inbound);
}
#endregion
#region Events
/// <summary>Represents the method that will handle the DataReceived event.</summary>
/// <remarks>The data is passed by reference -- handlers are able to add, remove, or
/// modify the data.</remarks>
public delegate void DataReceivedEventHandler(AsyncStreamPump sender, ref byte[] data);
/// <summary>Occurs when a block of async data is received.</summary>
public event DataReceivedEventHandler DataReceived;
/// <summary>Represents the method that will handle the EndOfStream event.</summary>
public delegate void EndOfStreamEventHandler(AsyncStreamPump sender);
/// <summary>Occurs when the end of a stream is reached.</summary>
public event EndOfStreamEventHandler EndOfStream;
#endregion
#region Implementation
private int buffersize;
private byte[] buffer;
// Callback function to process one buffer's worth of async data. In syncronous read
// scenarios (eg: a FileStream opened w/ IsAsync=false, or a NetworkStream with less
// data than its underlying buffer size, etc) this function is invoked recursively
// (by the BeginRead call, at the bottom).
private void OnDataReceived(IAsyncResult iar)
{
// Finalize the async op -- either gets data, or reports eof status by returning
// zero. (We use the CanRead propget to avoid calling EndRead in the event the
// stream has already been disposed of.)
int n = 0;
if (inbound.CanRead)
n = inbound.EndRead(iar);
// Did we get any data?
if (n > 0)
{
// Allow clients to eavesdrop on data pumped through the pipe.
byte[] data = new byte[n];
Array.Copy(buffer,data,n);
FireDataReceived(ref data);
// Is the outbound stream closed? Could happen, I guess.
if (outbound != null && outbound.CanWrite)
{
// Push it on down the line...
outbound.Write(data,0,n);
if (autoflush) outbound.Flush();
}
// Re-establish the async read request.
inbound.BeginRead(buffer, 0, buffersize, new AsyncCallback(OnDataReceived), inbound);
}
else
{
// Last call for alcohol..
FireEndOfStream();
// The inbound connection is closed, so close the outbound conn, in turn.
if (outbound != null) outbound.Close();
// Raise the white flag, then retreat.
done = true;
donesignal.Set();
}
}
// Safe event-firing helper methods.
private void FireDataReceived(ref byte[] data)
{
if (DataReceived != null)
DataReceived(this,ref data);
}
private void FireEndOfStream()
{
if (EndOfStream != null)
EndOfStream(this);
}
#endregion
}
}
