Question: If you had to transfer a large amount of data from A to B how would you do it? Dealing with I/O intensive big data workload challenges: velocity.
Introduction
In Part I of this series we acknowledged that data has quickly become one of the worlds most valuable assets. It powers social media, Large Language Models (LLMs), trading screens, security & risk analysis, manufacturing, healthcare, retail, advertising, agriculture, and more.
A few key traits pose engineering challenges beyond the norm: volume (comes in enormous quantities), velocity (created quickly, usually in real-time), and variety (comes structured, semi-structured, and unstructured).
Previously we discussed intensive IO: getting volumes of data moving. However, data can diminish in relevance if we can’t handle the often extreme velocity of the data production, or latency isn’t managed effectively.
Velocity
So far, for handling data volume we’ve been leveraging system calls1 like open/openat, lseek, close, read, write. In this article, we will be introduced to some more.
Using Memory Wisely
Memory Mapping
When dealing with larger files, most processes tend to utilize mmap() which is a system call that will create a direct mapping for the file in the virtual memory space of the process. This means the application can access it directly without additional read()
, write()
, or seek()
calls or copying into an extra buffer. Behind the scenes the kernel will still handle any swapping in/out of pages so that the calls work as intended. Memory is unmapped using munmap().
In these examples we use mmap()
to load and traverse files up to 4GB in size without issuing extra read()
calls and context switching between user and kernel space:
Example: Rust
As of the time of writing, Rust (focused on memory safety) doesn’t support
mmap()
in its standard libraries and requires a crate (memmap2), or custom FFI wrapper to use it.
strace
for 100K file shows no more calls toread()
whilst processing:
strace
for 4GB file:When running on a 4G file it keeps 1G resident in the cache (without tuning):
Example: Java
Executing the app with an
strace
on a 100K file shows we’ve said goodbye toread()
:A check with
vmtouch
orfincore
will show that all pages were loaded into memory (note: the same also occurs if only a single byte is read)After execution on a 1G file, the
strace
output doesn’t differ but the number of virtual pages cached is given:Note: As of writing Java 23 has a limitation of 2GB on the mmap size, so a sliding window would be required for files larger than that.
mmap()
implements demand paging — a lazy loading technique where the kernel only copies data into physical memory when there is an attempt to access it. The memory mapped area seen by the process reads directly from the page cache without requiring a copy.
Fig 1: Using
mmap()
for fast local reads in user space.
When a mmap()
is created with the MAP_SHARED
flag it can be shared with other processes. That means velocity! Velocity through durable Inter-Process Communication (IPC). One process can write whilst the other reads, like in an ETL pipeline, or both can read/write to the same file (or, rather, virtual memory space). This can be complicated when in comes to managing synchronization2 and efficient reads but a common technique used is append-only data structures and write-ahead logs.
Fig 2: Using
mmap()
for durable shared memory reads/writes between processes
For same-device communication between processes this technique can provide greater velocity than either domain sockets or network sockets over TCP. You would be forgiven for reaching for something dedicated and optimized for that like Kafka, gRPC, ZeroMQ, Chronicle in no particular order — but then, you’d miss out on the Linux tricks. Note: if the data did not need to be durable we could use shm (shared memory).
Time and Space
If we wanted to move a large amount of data from point a to point b and there was no processing involved, no need for the CPU, then how would we do it? Big data poses greater challenges for data replication and redundancy than usual. Also the velocity at which data is produced requires a means to move it to where it needs to be as quickly as possible.
We might be tempted to reach for our read()
, and write()
operations but perhaps prime the cache before the calls with a mmap()
. In fact if you run an strace
on a cp
command you may find that’s exactly what it does.
Example: cp command
Linux
cp
command will run anmmap()
prior to running as manyread()
andwrite()
calls as necessary to cp the contents of file a to file b:cp testfile.bin nextfile.bin
That, however is not fast enough. Why do we need CPU copying and processing when we are not inspecting the contents of the file?
In order to save time, we need to save space: user space. Avoiding user space entirely apart from an initiating call and staying close to the hardware is the key to extracting more speed. Fortunately, Linux provides the means to do so.
Three additional system calls worth our consideration are copy_file_range(), sendfile()and splice(). All of these calls allow for copies of data without unnecessarily going via user space so long as the files support it3. (Note: YMMV: sendfile()
has a 2GB limitation on offset, hence sendfile64()
. Using copy_file_range()
effectively within limitations also may require multiple calls). They are known more generally as zero-copy techniques — calls that require either zero or at least minimal copying.
Example: Rust
The
strace
for the transfer of a 4GB file:
Example: Java
Java exposes
copy_file_range()
via itsFileChannel
API:The
strace
output for a transfer of a 100K file:…for a 2GB file:
…for a 4GB file:
Fig 3: Using
copy_file_range()
to zero-copy a file between to storage locations without going via user space.
Shared Nothing
The techniques we’ve looked at so far imply vertical scaling to process larger and larger data sets. This implies scaling up and potentially sharing more and more memory to deal with larger workloads. The problem, though, with a vertical scaling, shared-memory approach is cost and fault tolerance. Importantly, cost grows faster when vertically scaling one machine than with horizontal scaling, i.e. scaling out — a “shared nothing” architecture4. Furthermore, if that single system goes down it takes down everything with it.
In a shared nothing architecture, we can’t rely on shared memory within a single system alone to process data quickly. This requires fast transfer of data not just within a system but between systems too.
Thankfully, Linux treats sockets the same way it treats files, as just another file descriptor. In that way, the same system calls apply, so very little changes in our approach. In our examples we use netcat
5 to listen to the transfer to keep the them simple.
Example: Rust
At the time of writing, Rust doesn’t have a generic call which leverages
sendfile()
for socket calls.io:copy
will send between a file a socket with multipleread()
andsendto()
calls, similar to how thecp
command works with local files.Produces
strace
output similar to the following for a 100K file:“However, if we introduce the
nix
crate, then we can use the call directly ourselves (usingsendfile64
in this case):Then when we run
strace
on a 100K file we get output like the following:…and on a 4GB file
Example: Java
Code example for sending to the socket. On the other end we leverage
netcat
to listen for the transfer withnetcat -l 2015
.Note: when running an
strace
to send a 100K file the Java application first tries (and fails) to runcopy_file_range
then falls back tosendfile
. This occurs becausecopy_file_range
does not work on sockets.
Fig 4: Using
sendfile()
to zero-copy data over TCP sockets without going via user space.
When we look at how the transfer works within the kernel, it’s clear there is still opportunity to avoid copies. Network cards that support gather operations can avoid the extra copy within kernel space from the page cache to the network buffer by gathering directly at the NIC6. Further, on systems that support non-uniform memory access (NUMA) we can take advantage of the relative location of CPU to memory, or memory to the NIC to improve speed even further.
For modern applications hoping to leverage performance advantages enabled beyond those considered so far we have to look at kernel-less techniques (like DPDK) or go beyond system calls using io_uring() which provide sub-microsecond IO opportunities. We’ll follow-up with those in the future.
Modern I/O
Not all I/O problems present themselves as a simple data transfer from A to B without application intervention in user space. Modern interfaces enable I/O from user space whilst minimizing kernel overhead.
Ring buffers (aka circular queues/circular buffers) have proved their value as a foundational data structures for achieving low-latency, high-throughput communication in systems with constrained or fixed resources. Their predictable, mechanically sympathetic, mostly sequential memory access minimizes cache churn and takes full advantage of prefetching, making them very cache-friendly.
io_uring is a storage API for asynchronous I/O that enables the submission of one or more non-blocking I/O requests bringing together many well-established ideas around asynchronous high-performance storage I/O. It hosts ring buffers which are shared between user space and kernel space: a submission queue (SQ) for I/O requests and completion queue (CQ) for I/O responses. This setup allows for efficient I/O, while avoiding the overhead of inefficient system calls and unnecessary copies.
The manual pages for io_uring describe how to use it, but we could should7 leverage liburing
which simplifies the api. At a base level the main system calls we make use of are mmap(), io_uring_setup(), io_uring_enter(), and io_uring_register(). These calls are responsible for memory mapping, io_uring
setup, queue submission of I/O events, and file/buffer registration, respectively.8
Rust
TODO()
Java
TODO()
Placeholder
Diagram
So far we’ve discussed both volume and velocity but variety also poses unique I/O challenges for high speed analytics and processing of big data. Data storage and serialization formats play an important part in modern engineering for making this possible in an efficient way. We’ll discuss that topic next in Big Bytes, Big Data, Intensive IO Part III - Variety.
Disclaimer:
Any views and opinions expressed in this blog are based on my personal experiences and knowledge acquired throughout my career. They do not necessarily reflect the views of or experiences at my current or past employers
Footnotes
-
For an understanding of user space/kernel space and system calls and how they are relevant to handling data, see Big Bytes, Big Data, Intensive IO Part I - Volume and related references. ↩
-
This synchronization could be done using another system call
futex
or, more expensively withshm_open
but that is out of scope for this article. ↩ -
M. Kerrisk, The Linux Programming Interface: A Linux and UNIX System Programming Handbook. San Francisco: No Starch Press, 2010. ↩
-
Kleppmann, Martin. Designing Data-Intensive Applications: The Big Ideas behind Reliable, Scalable, and Maintainable Systems. First edition. Boston: O’Reilly Media, 2017. ↩
-
Netcat is usually available with one of the commands
netcat
ornc
. If not they can be installed withsudo apt-get install netcat
. Alternatively, one can create a server socket to listen to the transfer for on the other end. ↩ -
“Zero Copy I: User-Mode Perspective,” Dragan Stancevic. www.linuxjournal.com Available: https://www.linuxjournal.com/article/6345 [Accessed: Dec, 09, 2024] ↩
-
The API author says “Don’t be a hero” by using the direct api yourself, in favor of using
liburing
. Doing so will save some of the additional boilerplate of ring setup and IO submission as well as memory barriers/fencing when reading from or writing to ring buffers. https://kernel-recipes.org/en/2019/talks/faster-io-through-io_uring/. ↩ -
Axboe, Jens. “Efficient IO with io_uring” https://kernel.dk/io_uring.pdf [Accessed: Dec, 11, 2024] ↩