Building a User-Defined Function implementation for Apache Arrow and Webassembly - Part 2: The implementation
Let's get some nuts and bolts
Introduction
This is a three part blog post about a small venture I did into building a UDF implementation using Apache Arrow and Wasmtime.
Following on from the previous post where we introduced some of the problems we’d like to face as well as some auxiliary problems that have already come up, this blog aims for a basic implementation of what we previously discussed.
Initial Setup
The host
We’re going to start a basic binary cargo project:
cargo new wasm_executor
This will setup a new cargo project that compiles to a standard binary.
As per Wasmtime’s documentation, it’s pretty straight forward to get set up with a host implementation:
fn main() {
let engine = Engine::new(
Config::new()
.debug_info(true)
.coredump_on_trap(true)
.cranelift_opt_level(OptLevel::None),
)
.unwrap();
let module = Module::new(
&engine,
/** Wasm module code here*/
).unwrap();
let linker = Linker::new(&engine);
let mut store: Store<u32> = Store::new(&engine, 4);
let instance = linker.instantiate(&mut store, &module).unwrap();
let mut memory = instance
.get_memory(&mut store, "memory").unwrap();
}
It’s really as simple as that. There are a number of concepts that might be helpful perusing the Wasmtime documentation for, such as stores, linkers, memory and instances. Otherwise, we have our wasm module in bytecode that is required to be run.
*A quick note on the get_memory function above - the name “memory“ was just viewed from the wat output of compiling our wasm module to the WAT format:
(table $T0 202 202 funcref)
(memory $memory (export "memory") 17)
(global $__stack_pointer (mut i32) (i32.const 1048576))
(global $__data_end (export "__data_end") i32 (i32.const 1087144))
I am not certain about what the general conventions are here, but for this project, this will do :).
The module
Now that we have our runtime, we should probably get to work on our wasm module - which we will also build in Rust.
Rust makes it pretty trivial to set this up:
cargo new --lib example_wasm
This will instantiate a basic library type Cargo project. We’d need to add “cdylib“ to the cargo.toml to tell Rust that this will be some form of dynamic library:
[package]
...
[lib]
crate-type = ["cdylib"]
First, we need the memory.
Alright, so we have our basic structure set up, now we’d likely want to figure out how we are actually going to copy over the Arrow structs from our host to our wasm module. Fortunately, wasmtime allows us to read and write arbitrary slices of data to and from our wasm module’s allotted memory region:
// Writes this buffer to our memory at a given offset.
memory.write(&mut store, offset, buffer)?;
// Reads memory from the given offset into buffer, filling the buffer.
memory.read(&mut store, offset, buffer)?;
So great, let’s just write it in and send the offset? No, there is a slight problem: The conventions that the programming language that you’ve used to compile your wasm module may be using some or all of the memory at a specific offset to another offset. This may cause dreaded undefined behaviour if, for instance, you overwrote the stack of your implementing programming language.
A workaround is to have your module export a memory allocation function similar to that of the libc’s malloc function. So in your webassembly module code, you’d have:
#[unsafe(no_mangle)]
pub unsafe fn _malloc(len: u32) -> *mut u8 {
let mut buf = Vec::with_capacity(len.try_into().unwrap());
let ptr = buf.as_mut_ptr();
std::mem::forget(buf);
ptr
}
Here, we make a _malloc function that takes in a length and returns a pointer to a byte array. This pointer is usually a 32 bit integer of sorts as wasm is generally compiled to have 32-bit sized pointers. We allocate a vector of bytes, transmute it to a pointer, forget the memory (to make sure it does not get destructed at the end of the scope) and return the pointer.
Inside of our host environment, we’d reference this function as such:
let wasm_malloc_fn = instance
.get_typed_func::<u32, u32>(&mut store, "_malloc")?;
let offset = wasm_malloc_fn
.call(&mut store, len)?;
Storing our Arrow arrays.
Now that we have the memory and copy semantics, we can go about moving the Arrow data structures into our wasm module and allow that module to execute arbitrary code on it.
Remember our C struct from the first blog post? Yea, pretty much each pointer in that struct will need to go through the process of copying the data that the pointer points to and taking the original offset:
pub fn clone_struct<T>(
array: T,
wasm_malloc_fn: &TypedFunc<u32, u32>,
mut store: &mut Store<u32>,
memory: &mut Memory,
) -> u32 {
// Turn the given struct into an array of bytes.
let buffer: &[u8] = unsafe {
std::slice::from_raw_parts(&array as *const _ as *const u8, std::mem::size_of::<T>())
};
// Malloc the given length in our wasm module.
let ptr = wasm_malloc_fn
.call(&mut store, buffer.len().try_into().unwrap())
.unwrap();
// Write the data to the offset.
memory
.write(&mut store, ptr.try_into().unwrap(), buffer)
.unwrap();
// Return the pointer.
ptr
}
There are some caveats here
Arrays
An arrow array may have some children, this is represented in the Data interface as such:
int64_t n_children;
struct ArrowArray** children;
here you can see that there are a number of children n_children and a double pointer to a struct of ArrowArray. n_children signifies the amount of children that belong to this array, whilst the first pointer points to a vector of pointers of an amount of n_children, each of which point to their own arrow array.
In order to copy this over, we’ll iterate over each pointer, dereference the array, copy it over. Once we’ve copied, we will then copy over the array of pointers that point to the newly copied arrays and then return a pointer to the array of pointers, get it?
This looks like:
// First, let's copy over each struct to memory, returning a Box of [u32]
let children_box: Box<[u32]> = children
.into_iter()
.map(|child_schema| {
let ptr = clone_struct(child_schema,
wasm_malloc_fn,
store,
memory);
ptr
})
.collect::<Box<_>>();
// We then copy over the array of pointers.
let children_ptr = wasm_malloc_fn
.call(
&mut store,
std::mem::size_of_val(
children_box.as_ref()
).try_into().unwrap(),
)?;
// Clone the data over.
memory
.write(
&mut store,
children_ptr.try_into().unwrap(),
u32_to_u8(&children_box),
)?;
Release Function
Closer to the bottom of the struct, you can see that there is something called the “release callback“:
// Release callback
void (*release)(struct ArrowArray*);
This is a function pointer to a function that “cleans up” this array once it is destructed. This effectively cleans up all the resources being held by this struct.
As we assume that the webassembly module will only be executed for a short period of time and that the lifetime of the data can be tied to the lifetime of the webassembly module, we’re just going to make this a null pointer (we just need to make sure that the module doesn’t ever try to invoke this function without checking that it is a nullptr).
Conclusion
And voila! We have a rough implementation of how we’d might do all the things we want to do with this project. Let’s pull it all together and see how it works!