Expand description
Adaptors from AsyncRead
/AsyncWrite
to Stream/Sink
Raw I/O objects work with byte sequences, but higher-level code usually wants to batch these into meaningful chunks, called “frames”.
This module contains adapters to go from streams of bytes, AsyncRead
and
AsyncWrite
, to framed streams implementing Sink
and Stream
.
Framed streams are also known as transports.
§Example encoding using LinesCodec
The following example demonstrates how to use a codec such as LinesCodec
to
write framed data. FramedWrite
can be used to achieve this. Data sent to
FramedWrite
are first framed according to a specific codec, and then sent to
an implementor of AsyncWrite
.
use futures::sink::SinkExt;
use tokio_util::codec::LinesCodec;
use tokio_util::codec::FramedWrite;
#[tokio::main]
async fn main() {
let buffer = Vec::new();
let messages = vec!["Hello", "World"];
let encoder = LinesCodec::new();
// FramedWrite is a sink which means you can send values into it
// asynchronously.
let mut writer = FramedWrite::new(buffer, encoder);
// To be able to send values into a FramedWrite, you need to bring the
// `SinkExt` trait into scope.
writer.send(messages[0]).await.unwrap();
writer.send(messages[1]).await.unwrap();
let buffer = writer.get_ref();
assert_eq!(buffer.as_slice(), "Hello\nWorld\n".as_bytes());
}
§Example decoding using LinesCodec
The following example demonstrates how to use a codec such as LinesCodec
to
read a stream of framed data. FramedRead
can be used to achieve this. FramedRead
will keep reading from an AsyncRead
implementor until a whole frame, according to a codec,
can be parsed.
use tokio_stream::StreamExt;
use tokio_util::codec::LinesCodec;
use tokio_util::codec::FramedRead;
#[tokio::main]
async fn main() {
let message = "Hello\nWorld".as_bytes();
let decoder = LinesCodec::new();
// FramedRead can be used to read a stream of values that are framed according to
// a codec. FramedRead will read from its input (here `buffer`) until a whole frame
// can be parsed.
let mut reader = FramedRead::new(message, decoder);
// To read values from a FramedRead, you need to bring the
// `StreamExt` trait into scope.
let frame1 = reader.next().await.unwrap().unwrap();
let frame2 = reader.next().await.unwrap().unwrap();
assert!(reader.next().await.is_none());
assert_eq!(frame1, "Hello");
assert_eq!(frame2, "World");
}
§The Decoder trait
A Decoder
is used together with FramedRead
or Framed
to turn an
AsyncRead
into a Stream
. The job of the decoder trait is to specify
how sequences of bytes are turned into a sequence of frames, and to
determine where the boundaries between frames are. The job of the
FramedRead
is to repeatedly switch between reading more data from the IO
resource, and asking the decoder whether we have received enough data to
decode another frame of data.
The main method on the Decoder
trait is the decode
method. This method
takes as argument the data that has been read so far, and when it is called,
it will be in one of the following situations:
- The buffer contains less than a full frame.
- The buffer contains exactly a full frame.
- The buffer contains more than a full frame.
In the first situation, the decoder should return Ok(None)
.
In the second situation, the decoder should clear the provided buffer and
return Ok(Some(the_decoded_frame))
.
In the third situation, the decoder should use a method such as split_to
or advance
to modify the buffer such that the frame is removed from the
buffer, but any data in the buffer after that frame should still remain in
the buffer. The decoder should also return Ok(Some(the_decoded_frame))
in
this case.
Finally the decoder may return an error if the data is invalid in some way. The decoder should not return an error just because it has yet to receive a full frame.
It is guaranteed that, from one call to decode
to another, the provided
buffer will contain the exact same data as before, except that if more data
has arrived through the IO resource, that data will have been appended to
the buffer. This means that reading frames from a FramedRead
is
essentially equivalent to the following loop:
use tokio::io::AsyncReadExt;
let mut buf = bytes::BytesMut::new();
loop {
// The read_buf call will append to buf rather than overwrite existing data.
let len = io_resource.read_buf(&mut buf).await?;
if len == 0 {
while let Some(frame) = decoder.decode_eof(&mut buf)? {
yield frame;
}
break;
}
while let Some(frame) = decoder.decode(&mut buf)? {
yield frame;
}
}
The example above uses yield
whenever the Stream
produces an item.
§Example decoder
As an example, consider a protocol that can be used to send strings where each frame is a four byte integer that contains the length of the frame, followed by that many bytes of string data. The decoder fails with an error if the string data is not valid utf-8 or too long.
Such a decoder can be written like this:
use tokio_util::codec::Decoder;
use bytes::{BytesMut, Buf};
struct MyStringDecoder {}
const MAX: usize = 8 * 1024 * 1024;
impl Decoder for MyStringDecoder {
type Item = String;
type Error = std::io::Error;
fn decode(
&mut self,
src: &mut BytesMut
) -> Result<Option<Self::Item>, Self::Error> {
if src.len() < 4 {
// Not enough data to read length marker.
return Ok(None);
}
// Read length marker.
let mut length_bytes = [0u8; 4];
length_bytes.copy_from_slice(&src[..4]);
let length = u32::from_le_bytes(length_bytes) as usize;
// Check that the length is not too large to avoid a denial of
// service attack where the server runs out of memory.
if length > MAX {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Frame of length {} is too large.", length)
));
}
if src.len() < 4 + length {
// The full string has not yet arrived.
//
// We reserve more space in the buffer. This is not strictly
// necessary, but is a good idea performance-wise.
src.reserve(4 + length - src.len());
// We inform the Framed that we need more bytes to form the next
// frame.
return Ok(None);
}
// Use advance to modify src such that it no longer contains
// this frame.
let data = src[4..4 + length].to_vec();
src.advance(4 + length);
// Convert the data to a string, or fail if it is not valid utf-8.
match String::from_utf8(data) {
Ok(string) => Ok(Some(string)),
Err(utf8_error) => {
Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
utf8_error.utf8_error(),
))
},
}
}
}
§The Encoder trait
An Encoder
is used together with FramedWrite
or Framed
to turn
an AsyncWrite
into a Sink
. The job of the encoder trait is to
specify how frames are turned into a sequences of bytes. The job of the
FramedWrite
is to take the resulting sequence of bytes and write it to the
IO resource.
The main method on the Encoder
trait is the encode
method. This method
takes an item that is being written, and a buffer to write the item to. The
buffer may already contain data, and in this case, the encoder should append
the new frame the to buffer rather than overwrite the existing data.
It is guaranteed that, from one call to encode
to another, the provided
buffer will contain the exact same data as before, except that some of the
data may have been removed from the front of the buffer. Writing to a
FramedWrite
is essentially equivalent to the following loop:
use tokio::io::AsyncWriteExt;
use bytes::Buf; // for advance
const MAX: usize = 8192;
let mut buf = bytes::BytesMut::new();
loop {
tokio::select! {
num_written = io_resource.write(&buf), if !buf.is_empty() => {
buf.advance(num_written?);
},
frame = next_frame(), if buf.len() < MAX => {
encoder.encode(frame, &mut buf)?;
},
_ = no_more_frames() => {
io_resource.write_all(&buf).await?;
io_resource.shutdown().await?;
return Ok(());
},
}
}
Here the next_frame
method corresponds to any frames you write to the
FramedWrite
. The no_more_frames
method corresponds to closing the
FramedWrite
with SinkExt::close
.
§Example encoder
As an example, consider a protocol that can be used to send strings where each frame is a four byte integer that contains the length of the frame, followed by that many bytes of string data. The encoder will fail if the string is too long.
Such an encoder can be written like this:
use tokio_util::codec::Encoder;
use bytes::BytesMut;
struct MyStringEncoder {}
const MAX: usize = 8 * 1024 * 1024;
impl Encoder<String> for MyStringEncoder {
type Error = std::io::Error;
fn encode(&mut self, item: String, dst: &mut BytesMut) -> Result<(), Self::Error> {
// Don't send a string if it is longer than the other end will
// accept.
if item.len() > MAX {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Frame of length {} is too large.", item.len())
));
}
// Convert the length into a byte array.
// The cast to u32 cannot overflow due to the length check above.
let len_slice = u32::to_le_bytes(item.len() as u32);
// Reserve space in the buffer.
dst.reserve(4 + item.len());
// Write the length and string to the buffer.
dst.extend_from_slice(&len_slice);
dst.extend_from_slice(item.as_bytes());
Ok(())
}
}
Re-exports§
pub use self::length_delimited::LengthDelimitedCodec;
pub use self::length_delimited::LengthDelimitedCodecError;
Modules§
- Frame a stream of bytes based on a length prefix
Structs§
FramedParts
contains an export of the data of a Framed transport. It can be used to construct a newFramed
with a different codec. It contains all current buffers and the inner transport.- A
Sink
of frames encoded to anAsyncWrite
.
Enums§
- An error occurred while encoding or decoding a chunk.
- An error occurred while encoding or decoding a line.
Traits§
- Decoding of frames via buffers.
- Trait of helper objects to write out messages as bytes, for use with
FramedWrite
.