Building RPC framework for Rust programming language

Once I found there are so much work to do to make system programming on Rust a little bit easier, I make up my mind to take it as a chance to implement some basic distributed system features like Raft consensus algorithm. The foundation of a distributed system is a mechanism for server to communicate with. RPC comes with predefined protocols for each command like URL and parameters within which makes it ideal in large systems.

I had built 2 kinds of systems relies on RPC. Although they are all designated for client to invoke functions in another server, but the actual procedure is totally different.

The first one is a Clojure project. Because I can compile Clojure code in runtime or lookup a function though its namespace and symbol, this way of developing on this framework is deadly simple, Deliver the full name of the function and its parameter in map when invoking is all needed. In this way, functions to invoke are all normal one, definition is not required. It looks convenient, but inefficient due to function name encoding/decoding and parameters with full map key names and this RPC is only available in Clojure applications, which means other programming languages cannot understand the data easily.

In the second project, I used Protocol Buffers from Google instead. Profobuff require developer to define all command and their parameter and returns as message in a file. Google built some tools to compile those files to source code in the programming language we wanted. It is way more efficient than my previous home brew implementation, and also able to deliver messages between applications built upon different programming languages. But maintaining  a protobuffer definition file is cumbersome and not agile enough, things may be broken after recompiled.

When searching for RPC framework for my Rust projects. I want this it to be as flexible as what I built for Clojure, but also efficient. I tried protobuff and even faster Cap'n Proto, but not satisfied. I also cannot just copy the way I use on Clojure because Rust is static typed for performance and it is no way to link a function in runtime form a string. After I found tarpc (yet another project from Google), I was inspired by it's simplicity and decided to build it on my own.

The most impressive part I took from tarpc is the service macro, which translate a simple serials of statements into a trait and a bunch of helper code for encoding and decoding and server/client interfaces. Although tarpc is more complex because it also supports async calls, but the basics are the same. We still need to define what we can call from a client, but the protocol definition are only existed in your code. Developer can define the interface with service macro,  implement server trait and generate server/client objects. For example, a simple RPC server protocol can be something like this

service! {
    rpc hello(name: String) -> String;
}

This will generate the code we required for both server and client. Next we need to implement the actual server behavior for hello, like this

#[derive(Clone)]
struct HelloServer;

impl Server for HelloServer {
    fn hello(&self, name: String) -> Result<String, ()> {
        Ok(format!("Hello, {}!", name))
    }
}

To start the server

let addr = String::from("127.0.0.1:1300");
HelloServer.listen(&addr);

To create a client and say hello to the server

let mut client = SyncClient::new(&addr);
let response = client.hello(String::from("Jack"));

The response should have already contains another string from the server.

let greeting_str = response.unwrap();
assert_eq!(greeting_str, String::from("Hello, Jack!"));

Looks simple, idiomatic, like what we define a function in Rust itself. No need to create another file and compile it every time we make any modification on it, this it because the service macro did all of the job for us under the hood.

Most of the work was done in compile time, like hash function names into an integer for network transport, parameters for each function are all encoded into a struct for serialization. This requires developers to configure compile plugin, but the performance gain worth the effort.

There is still more works to do to improve the RPC, like add  promise pipelining, async call, version control etc. My next work is to understand and implement Raft algorithm based on this RPC framework, I have put my code on GitHub if you are interested to take a look.

Language choice for my next pivot

I had worked on some distributed system and system for a while by implement a key-value store based on RAMCloud design and some other trivial middleware development. After drill into high performance programming, I found myself hard to take control from JVM to do some significant optimization that really matter for a database system. Although I tried to overcome some of the problems by programming my code off-heap to gain memory efficiency without waste spaces on object header, but I think this is not what JVM was intended for developer to do so because it makes backfire (JVM crashing). The most severe problem is the GC in every higher level programming language that intended to free the developer from memory management. JVM in OpenJDK do a great job in most circumstances, but always failed in performance critical tasks by stop-the-world GC pause and lagging. JVM and Clojure runtime is also burdensome for this kind of project, It spent about 1.6 GB of RAM for Morpheus to startup which makes it impossible to run in embedded devices and cheap virtual machines.

I was expecting for a minimal runtime, multi-platform, running on bare-metal without a virtual machine, super fast, expressive and don't need to bother memory management but efficient programming language that is suitable for my system software design. I worked to write golang for a while, expecting it to my weapon of choice because it is lead by my most favourite company Google and it is also somehow famous in the community. At first, I did get great experience: the language itself is very easy to learn (tooks ne about 1 day), it's ecosystem is well developed, tool chain is not painful. But when I tried to build something a bit more larger, I found it missed some features that I was expected.

First of all, it does not support generic because the developer of golang teams want to keep the language clean and they think it is not necessary. Then I have to convert data types from every value I got from a hash map. To keep my functions flexible, I have to write long and lousy interface {} expression. Writing a sorting comparator is also painful because of lacking language features; Second, it has no macro. After I worked with lisp like languages, I found macro is the key feature to save my time on repeating code structure and also gain performance by do some job at compile time. But for some reason, golang just does not have such feature; Third, golang is not that fast as a system programming. In some standard test, golang is not able to overrun Java. It's garbage collection is also not as well developer as Java's. The implementation of simple hello world HTTP server is also failed to beat netty in almost every cases.

I really think I have to go back to C/C++ way, like most of the popular system did. Until I found another programming language name Rust, made by Mozilla, used to power their next generation web browser engine servo. I have head about rust for a while, by it's unique strict check and scope rules to manage memory in compile time like ARC from Apple LLVM but more strict and ensures no memory leak. It seems favour my need because I want to manage memory by myself only if I required. Rust provide scope rules based on ownership and borrowing, I can also manage memory on my own if I do the job in unsafe block. Even if I use my own way to manage memory, I can still use it's scope rules to release memory, like this

impl Chunk {
    fn new (size: usize) -> Chunk {
        let mem_ptr = unsafe {libc::malloc(size)} as usize;
        Chunk {
            addr: mem_ptr,
            segs: segments,
        }

    }
    fn dispose (&mut self) {
        info!("disposing chunk at {}", self.addr);
        unsafe {
            libc::free(self.addr as *mut libc::c_void)
        }
    }
}

impl Drop for Chunk {
    fn drop(&mut self) {
        self.dispose();
    }
}

After I use malloc from libc, I don't need to explicitly decide when to free but let rust to do the job by implementing Drop trait. This let me able to combine the power from both from C and rust itself. The concepts of ownership and borrowing seems annoying in the beginning because the pointers and objects cannot been delivered between functions freely, but it makes sense when considering the problem in deeper level. Rust seems make fine rules to avoid common problems that programmers might make from both concurrency and memory management.

Rust is light, you can even write a operating system with it because some university have already asked their students to do so. it is cross-platform because it use LLVM to compile. Rust have a pattern match based macro system and more in progress. It have cargo for package management. It supports almost every language features that a modern PL should have, like blocks, lambda, type inference, C libraries interop and more.

It seems nothing wrong about rust it self, I kinda like this language after 1 week of trial. But the downside is, this language is still favoured by small group of people. It's community is active and the people are nice, but there is not so much libraries that can just borrow and use. I my case, I can't find a well maintained concurrent hash map but have to do this on my own. Well that's how a open source works.

I have already halted the development of JVM based Morpheus and Nebuchadnezzar, start to redo what I have done with rust. Although I am able to inherit most of the ideals I learned from the previous version, I found it more challenging because there is more concepts that are operating system related and some of the work that have been done by others in Java have to migrate to rust on my own. I respect the power of community, the reason of why Java ecosystem is so powerful is noting but the cumulating of the works form the community members years by years.

Multifunctional Home Server Virtualization

My IDC server is down again, I reckon if it is possible to host the server in my home and retire the IDC server, then I can reclaim hardwares from the problematic server (it's storage is still decent) and also save CNY 4300 annually. I do have a spare machine that can do this job, it has 32 logic cores and 128GB of RAM. The problem is when I use it as my test server, I also use this machine as a windows gaming platform, which requires a powerful graphic card and windows operating system (my servers are all linux). When the machine be use as a server, it have to be stable, and prevent downtime as much as possible. I also want to use the graphic card on the machine for my GPU and CUDA experiments. The operating system for the experiment platform is Ubuntu and reboot the host server is not an option in this case.

Virtualisation looks like a solution. I was able to use this machine for both computing in Ubuntu Server and gaming platform in Windows with the help of Hyper-V by Microsoft in Windows 8. There will be 3 virtual server in the host, 2 of them need to use the graphic card in the host, because there is only one card available, there will only one of the two VM running in the same time. If the VM want to use hardware on the host PCI-E, it have to be passthrough to the VM. When a hardware was passthroughed, it will be unavailable for host and other VMs. It can be inference that if you want to use PCI passthrough for your graphic card, you also need another card for your host, or you can't see anything after the operating system booted.

IMG_2732
The final hardware configuration. Each VM have it's own storage in hardware RAID or dedicated hard drive. Host system will in the F60 SSD and a GT 210. The GTX 1080 is the graphic card for PCI passthrough.

Because Hyper-V does not support PCI passthrough, I have to looking for other solutions. As a result to my research, almost all of the solutions that support PCI passthrough virtualization are based on Linux kernel. The first one come into my mind is Xen Server from Citrix. It is free and open source. I have successfully installed the system on my host, installed Windows and passthroughed the graphic. But the problem is the driver from Nvidia cannot get the card working, Windows was able to recongize it's model but there is always a yellow exclamation mark in the device manager. I also gets black screen with no signal all the time then the VM started. I think that might be a compatible problem. Then I turned to VMWare ESXiunfortunately it's installer cannot load on my machine.

Two of the most authentic solution was passed, I think I have to build my own tools from scratch. Xen Server is a platform faces enterprises, based on Linux, Qemu and KVM. I found some articles to build my own virtualization solution on regular Linux distribution like Xubuntu 16.04 manually. The pro to this way is I can control every delicate details in setting up the VM for my system.

Get a normal VM without PCI passthrough working is easy. But for the machine that required real graphic card, it becomes tricky. First you need to blacklist your device in the kernel pci-stub to avoid it to be used in the host, then it need to be switch to VFIO for VMs. After following the instructions on the web page I mentioned, I get almost the same problem that I encountered in the Xen Server, there is no output in the graphic card. When I use the virtual graphic from Qemu, the driver is not working after installation.

I go through a lot of search and fount this article said that the new card cannot work with default seabios in the system, but OVMF instead. The solution is simple. First install OVMF bios:

sudo apt-get install ovmf

then modify  the bios parameter for your VM command with:

-bios /usr/share/ovmf/OVMF.fd

After that I was able to see pictures on the screen output form the passwhtoughed graphic card.

How about the performance? I did a quick test in the VM with 3DMark Demo. I got 6582 for graphics and 5947 for CPU (8 cores 16 threads). Looks like near native.

Comp

I have also setup a Ubuntu Desktop version use the same Qemu configuration and get CUDA SDK and mxnet examples working. Those two system are switchable by close one and starting another, without interrupting the third virtual machine running as a application server.

Lessons learned about locks

Locks are handy and unavoidable in parallel programming. Especially in some applications like databases, when there may be multiply threads modify one piece of data. Locks can make sure only one thread can get access and modify the data in the same time.

Morpheus key-value store rely on locks heavily. There is lock for segments in trunks, and lock for cells. Recently, I am trying to improve the performance of the store, and reduce memory footprint.

My first attempt is to replace the CellMeta Java objects with simple addresses (long data type) for each cell index. This can save at least 24 bytes each, that will be considerable amount of spaces when there may be millions of cell in the store per-node. But there will be one compromise. In the beginning, I use synchronized keyword to lock on cells. But when there is no object entity that have been replaced as address, there will also be no way to ues synchronized as locks. My first solution is to create 1024 locks for each trunk, each cell will assigned to one lock according by it's hash. That means, a locks may corresponding to multiply cells. It looks like a controllable solution, even JDK use this principle to implement it's ConcurrentHashMap. But it will produce deadlock you trying to make a lock chain in parallel. A lock chain means a cell will be locked when another cell be locked. When 2 or more locks be acquired in different order, deadlocks occurs. In the the key-value store case, deadlocks may happens even when lock targets are irrelevant because cells may share the same lock. After discover this defect, I implemented a lock cache. Because I don't want to create lock for every cell in the beginning, I only want lock objects to be created when it is needed to save memory space. When a cell is asked to be locked, the store will try to find the lock in the cache according to cell hash. If it was found, lock will locked, or lock will be created. If the lock is asked to be released and lock count of a cell is zero, the lock will be removed from cache and waiting for GC to remove it from memory. The cache did a little impact on performance, but more memory efficient. It is equivalent to previous synchronized way, a bit heavy but with able to provide read write lock feature rather than spin lock monitor.

I use reentrant locks in the first time. But I used to not fully understand what the term 'reentrant' means. It means you can lock the same lock in the same thread more than one times, without cause deadlocks[1]. This feature is useful to build more clean code without concern about the lock status. The rule of using a lock is lock and unlock appears in pairs regardless what happened between the lock period even a exception thrown. In Java and Clojure, unlock should be put in finally block of a try clause, catch block is irrelevant. When you are using reentrant lock, lock time must equal to unlock one, or, deadlock and exceptions will occur. This rule looks easy, but in some situation, developers still need to track the locks, like reference count in old school iOS programming without ARC. I made a mistake by use some judgements like if (! l.isLockedByCurrentThread()) l.lock() and if (l.isLockedByCurrentThread()) l.unlock(). This will make the lock been locked in the outer layer but be released in the inner layer, which will cause inconsistency.

Programming concurrent applications is a delicate work. Developer have to consider all of the possible corner cases and use locks prudently. Use locks too much or in unnecessary places will damage the performance, but not using it when required will cause unexpected states and data corruption. Some applications claimed they have better performance to others, but unstable in practice, it is likely because the developer traded stability for performance. Lots of the lock problems also are hard to been tested without proper production level stress, a mature project should test all of the possible circumstances before release..

Graph traversal in distributed systems

Last month I have finished depth-first search and breadth-first search in Morpheus. Morpheus, based on hash distributed key-value store, requires to traversal vertices in distributed and even parallelised method.

The first thing comes to my mind is building a messaging passing infrastructure. Message passing infrastructure is almost the same as RPC in cluster connector, but use more succinct and determined data format to improve it's performance. Message passing is asynchronous, it means that sender does not wait for receiver to complete the actual processing, but for the delivery. Each message for a task have a 128 bit long ID for nodes to determinate the messages from the tasks. Each message also corresponding to one action, enables messages dispatcher to invoke the function bind to the action. Just like the RPC in cluster connector, if the message receiver is the same as sender, the message will not go through the network interface even will not been serialize or deserialize for transmission. 

Both DFS and BFS algorithm are built based on this infrastructure. But their behaviours are vary. For example, DFS cannot been paralleled. To conduct DFS on distributed system, the stack to the visited and discovered nodes must be transfer to the nodes once at a time for updates. BFS, in the other hand, is able to paralleled by conduct each node to discover the vertices children they belongs to in each level. We will discuss how those 2 kinds of graph traversal been implemented in Morpheus in next few chapters.

DFS in Morpheus, adapted ideas from S.A.M. Makki and George Havas thesis. Passing vertices stack through nodes for update. This method is single threaded because depth-first search is inherently sequential. DFS in Morpheus is a good choice when the size of subgraph that the start vertex belongs is not very large. It is also suitable for conventional graph traversal for user queries like detect links and existence of path.

BFS in Morpheus, is more complicated. Currently, Morpheus supports parallel search in each level. Morpheus rely on nested fork-join multithread pattern, illustrated in the figure below

Snip20160722_10

Consider the nodes contains vertices represents as A, B, C in 'Parallel Task I'; 'Main Thread' as the coordinator server (may be any one of the nodes in the cluster); 'Parallel Task I', 'Parallel Task II', 'Parallel Task III' as search task for each level. One BFS request will contain start vertex, end vertex and search rules. Here is what happend when the search begins.

  1. The coordinator first put the first vertex in the search list, send it to the node that belongs to the first vertex by it's id hashing and wait for the return message.
  2. The node received the search list and get the neighbour ids to the vertices in parallel according to search rules.
  3. When all parallel search for search ended, it send the search result to the coordinator as return message.
  4. The coordinator may receive return messages from multiply servers (it is not possible when there is only one vertex in the search list). When each of the return message arrived, it will tried to update local task status for each vertices, indicates whether have been visited, level, and parent
  5. After the coordinator received all of the return messages, it will extract the vertices for next level from local task status, partition the vertices by their servers into search lists and send them to their server
  6. Level advanced, step 2 will take the job. The iteration will continue until stop condition fulfilled or reached the maximum level in search rules when executing step 5

The idea to this algorithm was adapted from Lawrence National Laboratory on BlueGene. But I think I have not yet fully replicated their design, because in my implementation, coordinator will become the bottleneck in step 4 and 5; it will also cost a lot of memory to cache the vertices that have already been discovered, that's why I also provided on-disk cache for this occasion. BFS is used for finding shortest path, faster but more resource consuming path finding. It may also become the backbone of many other algorithms like fraud detection and community discovery.

To distribute task information like task id, parameters to each node in the cluster, we need a global shared hash map that any of it's changes can been updated to each node. I use paxos to do the job. There are also some other components like disk based hash map might come handy. I packed those computation related into hurricane component.

If you want to take a closer look at my actual implementation please refer: DFS, BFS

Crashing the G-WAN web server, at the speed of light

Recently I signed a job, working as a humble software engineer in a cloud gaming company at their server team.

My employer, which is also a software engineer, mostly working on game engines, writing C++ and C#, holding some game demo I.P (hopefully) and claimed that he solved the problem of building a profitable 3D cloud gaming servers, which is the major technical problem that would impact their business model (again, hopefully, most of the cloud gaming providers got bankrupted or get low profit due to server and bandwidth expense). Briefly, he has no any actual experience on how to build a cloud server platforms. That's why I thought I might be able to help him building their products that is robust and scaleable.

Although I write Java and Clojure code. I don't detest C/C++ and any other programmings that would compile to machine code for performance. Actually, I love high performance, and I really expect to learn anything about them form the job.

They have a great vision, and guaranteed they would sell their company for 3 billion dollars and everyone in the company would got 5 million dollars.

Lookin good, "what should I do?" I asked. "We have found ultimate solutions", they smiled in mysterious. I was really intrigued to know what secret they found. "We are asking you to write our web server in C++ programming language". Sounds challenging, and I like challenges in my work. Later they presented me a web server that I have never heard: G-WAN.

They told me to do some research on this one (another software is ArangoDB, that was fine). Because they did't tell me what I am actually going to do in the project, I started from searching on the internet for introductions and documentations.

The first thing I investigated is their web site.

Snip20160607_1

The web page design looks decent. What they focus on the front page is like "It's the era of multicore computing". I surely know that, the words looks are just written for those who don't have experiences on server programming and project managers.

What's next? In the about page, I expected some more detail information about how this software works. Instead, I got this.

Snip20160607_3

I smell taste of mouldy medals. Then I tried to look into this website and trying to find more technical from this web site. In the FAQ page, I started to felt strange and disturbing.

Why? Because non of the cases in the FAQ use modern hardware and operating systems. Instead there are plenty of ancient and outdated configurations. Like more faster and scalable Ubuntu 10.04, their usually tested CPU is Intel Xeon W3680 @ 3.33GHz (6 Cores). I start to wondering am I just jumped through to the time that I have just enrolled my undergraduate school. The web site is more like a fraud to me. But I may be wrong, so I start to find how many servers are powered by G-WAN.

Shodan can do this job for me, and it did give me an interesting result. As a server have released for about 8 years ago, there are only approxmy 21 HTTP servers online and most of them are serving static contents or totally abandoned.

Snip20160607_4

 

I stopped to take deep look on this project because I know there must be some reason for this general purpose server not to be accepted and got so few users, because even the Clojure http server http-kit got at least 800 sites, it is young and not the most popular one in the small Clojure community.

I start to search the server name in Google. There is not much about it, but there are some threads from Riddit, Hacker News.

https://www.reddit.com/r/linux/comments/2mgo7o/wtf_is_gwan_web_server/

https://news.ycombinator.com/item?id=4109698

https://news.ycombinator.com/item?id=8130849

and some blogs

https://riccardo.forina.me/why-i-ll-never-trust-g-wan/

http://tomoconnor.eu/blogish/gwan-snakeoil-beware/

Most of them are really critical and I don't know how much they suffered to get angry like this. Finally I found out later by myself.

I reported my research to my boss and telling them I am not recommend this software in our projects. But it seems they don't agree with that and tells me to do the test and make my decision.

Unfortunately, I got really bad stomachache that day an have to stay at home and waiting for my No.2 to come out at any time (sorry for the description). And I also realised that there is no qualified server grade equipments in the office and the only way to perform a comprehensive test is to use my own cluster that I was only use do my own develop and run my own project. Because I really want to know how fast G-WAN can be, I suggested to stay at home test G-WAN and other candidates on my equipments.

Then I totally destroyed any my last expection on the software. For the record, I posted the results from Apache Bench and Weighttp to the GitHub Repo. I have to say, it is not  a complete test, but I am pretty sure G-WAN is totally and utterly a bad choice to any projects that are not toys.

Because it crashed under 1k concurrency test, in a second.

Why it crashed? I am wondering. I started to look for any clues, but nothing left in the log, except this:

Could not attach to process. If your uid matches the uid of the target
process, check the setting of /proc/sys/kernel/yama/ptrace_scope, or try
again as the root user. For more details, see /etc/sysctl.d/10-ptrace.conf
ptrace: Operation not permitted.
No frame selected.
Signal : 11:Address not mapped to object
Signal src : 1:.
errno : 0
Thread : 10
Code Pointer: 000000404f4c module:gwan function:?? line:0
Access Address: 000000000018
Registers : EAX=ffffffffffffffff CS=00000033 EIP=000000404f4c EFLGS=000000010246
 EBX=7fa0b19ff000 SS=00000000 ESP=7fa0d8ffa170 EBP=7fa0d8ffa1c0
 ECX=000000000008 DS=00000000 ESI=0000ffffffff FS=00000033
 EDX=000000000008 ES=00000000 EDI=7fa0b1a009b0 CS=00000033
Module :Function :Line # PgrmCntr(EIP) RetAddress FramePtr(EBP)
No available stack frames
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[1] 31224 segmentation fault (core dumped) sudo ./gwan

Still does not make any sense. What I can see from this "Segment Fault" is the developer got backfired from using dark magic reading deallocated memory address (I got almost the same problem on my graph database project when operating unsafe memory, but fixed).

I reported this exception to my boss. He said "That's must be your fault. You must use it wrong".

I panicked.  I can foreseen my miserably life, never have a chance to go home at the time I should be, playing with my cat and my PlayStation.

He may discovered and my co-worker also have no faith on developing a regular web server based on such an unstable foundation. My boss agreed for us to use golang instead.

"What a nice day" I think. "But I will use it on my own", he said. I smiled at him and did't say anything else.

So, why they are so crazy about this. It seems most of the people are attracted by it's amazing compare chart.

local_nginx_lighty_gwan_100k

 

G-WAN is above 2x faster than any other web servers, including Nginx, the one that considered as the most fast and widely-used web server in the world, looks like crap in the chart. If that was true then those open source authors are either dumb-ass or G-WAN guys are really genius. But truly, they are not.

They are even not close to their claim. According to my test results, G-WAN is only at most 30% faster than Jetty. Compare to golang, G-WAN is 100k qps against 70k qps goroutine without any frameworks. But when you consider start to build something upon G-WAN, it is going to a nightmare.

I am not trying to persuade my employer to give up hope on it, because he paid for the support subscription. Looks like he trusts sales men more that his own team. Nice job on helping those rich Swiss people. He will not understand what I have been suffered until he did the same evaluation like I did.

One week after I submitted my report, those "TrustLeap" guys gives me their comment on my test and totally ignored my questions about how the crash happend. They criticize me not using a really fast operating system (which the newest one is not the fastest), my kernel version is too high, I tested their web server with X window started. But they just didn't explain anything about WHY THEIR SERVER CRASHED. They implied that the only way to run their server right is to use old OS like Ubuntu Server 10.04, which have already been stopped supported from Canonical. The is how those guys treat their customers.

I was so furious about wasting my time on such program and this problematic stuff not getting recognized by my employer. In another perspective, it is a successful project. It did attracted some people like my employer to pay for it even he did not know anything about distributed systems, scaleability and software architecture. It is also not a successful project because their trick is so obvious to professionals that can only fool newbies. I am going to end this unpleasant article with quote by voretaq from ServerFault.

I thought the IT world learned its lesson about sacrificing all in the name of performance with qmail and the rest of the djb family of things...

Building a Graph Engine -- Computation Engines

Morpheus is a distributed graph engine. Once I resolved it's  storage problem, I start to build it's computation engines. If I just leave this project in current stage as a pure high-performance graph storage, user has to write their own algorithms. Most importantly, client side traversal may cause performance issues on communications.

Currently Morpheus use message passing to establish BSP and any other traversal based computation models. To reduce the size of each message, Morpheus can distribute task parameters to all of the machines in the cluster. The current commit have already contains distributed depth first search for path search, construct sub-graph for vertex or bulk update. but I have't finished all of them yet because the condition to stop the search or update actions require the existence of query language in the system like Cypher in Neo4j, which I am working on to provide the flexible functionality in search and updates.

I haven't decided the format of the query language. Lisp-like expression (S-expression or Polish notation) is preferable because of it's easy implementation in parser. I will not simply eval the code due to security concern and performance (eval in Clojure will generate new class, it's life-cycle is not manageable and possibly leads to memory leak). The problem of S-expression is that there a few people get used to such expressions. People usually read 1 + 1 = 2, but an expression like (= (+ 1 1) 2) looks strange to them. Another advantage of s-expression is it is more succinct when 1 + 2 + 3 + 4 can be represented as (+ 1 2 3 4).

Morpheus also have a code distribute feature. Users can write their plain code, even pack as Leiningen project, send them to the cluster and run remotely without compile anything. It is useful if users want to run their own algorithms in the cluster. There should be a sort of API or framework, and I have done it yet.

Future plan includes a RDD based computation module for bulk data processing and map-reduce computation models, like GraphX in Apache Spark. It may consider vertices and edges as documents, users can join the documents with other sources. The computation model may not comply graph traversal models, but it is useful When relations are not the first class problems in the cases. The RDD module may have a dedicated memory management policy because the memory spaces required for computation are unpredictable. RDDs may require to temperately flush into disk instead of persist in the memory. For example, in LRU manner.

Computation models seems a much more challenging topic in this project. And I have so few time to finish the job.That is the reason I considered the Morpheus project as my second personal long-term project (the first the the WebFusion project, it was paused, will continue after the Morpheus project).

 

Servers are crashing, bad memory modules or hot weather?

I cannot figure out the reason, but recently, 2 of my servers both with 8 x 16GB of Kingstone DDR3 1600 MHz ECC REG memory give me the correctable ECC memory error messages. One of the server can recover by itself, another is totally dead with stucking at following screen on startup.

url_redirect

I used to resolve such problems by find and pop out the problematic modules. The server that cannot recover by itself is the server that I put it to the data center as a long term running server. I suffered 6 times of memory failure from different modules that was plugged into the same slot. Last year, I start to suspect there should be cooling problem, while there may have no air flow through the module and it gets overheat. The chassis for the server is a 1U with 4 hard drive. I invested a lot of resource on this machine, makes the small space crowded with 2 more SSDs for caching and 1 more shell less router for IPMI. Once I unplugged the memory module that cannot get any air flow, it was stabilized for a while (nearly 1 year), and got crashed today.

Another server as proving ground, computation server and virtual machine server in my home was observed that it has familiar problem when I testing Morpheus with wikidata graph on it. Instead of crashing, it can resolve the problem by itself but leave following messages in my server log

Apr 30 18:46:00 shisoft-vm kernel: [90407.431330] EDAC sbridge MC1: HANDLING MCE MEMORY ERROR
Apr 30 18:46:00 shisoft-vm kernel: [90407.431337] EDAC sbridge MC1: CPU 8: Machine Check Event: 0 Bank 5: 8c00004000010090
Apr 30 18:46:00 shisoft-vm kernel: [90407.431338] EDAC sbridge MC1: TSC 0
Apr 30 18:46:00 shisoft-vm kernel: [90407.431340] EDAC sbridge MC1: ADDR 1d7a258b00 EDAC sbridge MC1: MISC 204a167686
Apr 30 18:46:00 shisoft-vm kernel: [90407.431342] EDAC sbridge MC1: PROCESSOR 0:206d5 TIME 1462013160 SOCKET 1 APIC 20
Apr 30 18:46:00 shisoft-vm kernel: [90407.659365] EDAC MC1: 1 CE memory read error on CPU_SrcID#1_Ha#0_Chan#2_DIMM#0 (channel:2 slot:0 page:0x1d7a258 offset:0xb00 grain:32 syndrome:0x0 - area:DRAM err_code:0001:0090 socket:1 ha:0 channel_mask:4 rank:1)

If I unplugged the modules that was indicated as problematic, other modules fails in the next round of tests. That leaves me no other options but ignore it.

I have another machine with 2 x 16GB of Samsung 2133 MHz DDR4 REG ECC memory. Which was assembled in the beginning of last year does not have such problems even it's memory was exhausted and start to taking swap. I highly suspect the failures may been caused by the heat or maybe my hardware provider did not give me the qualified parts (motherboard may also cause such problems).

Right now, I decided to upgrade the machine with Samsung memory to 96GB, and one piece of Intel 750 400GB SSD as secondary storage for project Morpheus. I also planned to replace the machine in the data center with new one. My new server will take more care of head sink problems, hope it won't be so annoying in the future.

I don't suggest purchase hardware and place in the data center when cloud platforms (for example DigitalOcean and Amazon EC2) are affordable for their applications. My use cases are harsh, I have to customize my servers to balance performance and prices, and also have to manager server hardware problems by myself.

Nebuchadnezzar, finally evolved

After one month of desperate tests and adjustments (I nearly crashed one of the hard drive in my proving ground machine), I think I have found a reasonable design for the key value store.

The hard part of this project is memory management. My previous design is to allocate a large amount of memory for each trunk and append data cell to one append header. When a cell was deleted or obsoleted, there will be one defragmentation thread, moving all living cell to fill the fragments. That means if there is a fragment at the very beginning of the trunk, most of the cells in the trunk will be moved in one cycle and the fragment spaces only reclaims after one round of the defragmentation. Usually the fragment spaces reclaims so slow that the whole trunk cannot take updates operations too frequently, I used to apply slow mode in the system, but finally find it was a stupid idea.

I also tried  to fill the new cells into fragments. But it causes performance issues. Defragmentation process have to compete with other operation threads for fragments, there must be locks to keep consistent, that can heavily slow down the whole system.

After I wrote my previous blog in Celebrate Ubuntu Shanghai Hackathon, I nearly ditched the design I mentioned before and trying to implement partly log-structured design that was seen from RAMCloud. It took me only one night to do rewirte that part. I does not fully followed that design in this paper. Especially I does not append tombstones into the log. Instead, I write tombstones into the places that belongs the deleted cell, because the paper spent large amount of paragraph on about how to track tombstones and reclaim their spaces. And I found it has too much overhead. The major improvements compare to the original design is dividing one trunk into 8M segments. Each segment tracks their own dead and alive objects, and the maximum object size reduced from 2GB to 1MB. This design was proved to be efficient and manageable. Segmentation makes it possible to start multiply threads and defragment segments in one trunk in parallel. It significantly improved the performance on defragmentation. When a writer trying to acquire new space from trunks, it only need to search for one segment that has sufficient space. In most common situations, defragmentation process can always been finished on time.

I also redesigned the backup process. When trunks are divided into segments, synchronizing memory data into disks are much more straight forward. There is one thread for each trunk keep detecting dirty segments and send them to remote servers for backup once at a time. Dirty segment means the segments contains new cells. Segments with only tombstones will not considered as dirty and need to backup because the system tracks versions for each cell. In recover process, cells with lower version will be replaced by higher version, but higher one will not been replaced by lower one.

As in the application level, 1MB object size produce limitation. In project Morpheus, it is highly possible for one cell to exceed this limitation. Especially in edge list, one vertex may have large amount of edges in or out. But this limitation can be overcomed by implement linked list. Each edge list contains an extra field indicates next list cell id. When the list require to be iterated, we can follow the links to retrieve all of the edges.

Although, there is still some bugs in Nebuchadnezzar and I don't meant to recommend it to be used in production, but I glad to find that this project made some progress, instead of become another of my short term fever project.

Some thought about storage system for the in-memory key-value store

I was struggling for how to implement the durability feature for the key-value store. I have finished my first attempt of write memory data into stable storage. The paper from Microsoft did not give me much clue because it just mentioned that they designed a file system called TFS, which is similar with HDFS from Apache Hadoop. It did not give any more information about how and when to write memory data into the file system. My first and current design for memory management followed their implementation. After some tests by importing large amount of data into the store and then frequently update the inserted cells, I realised the original design from Trinity is problematic.

Memory management, also named defragmentation is used to reclaim spaces from deleted cells. Especially in update operations, the store will mark the original spaces as fragment and append new cell to the append header. If the spaces did not been reclaimed in time, the container will been filled and no more data can been written into. The approach in my previous presented article have major flaws in copying all of cells that follows the position of the fragment. That means if we mark the first cell in the trunk as fragment, in the defragmentation process, all the rest of the cells in the trunk will been copied. In some harsh environment,  the defragmentation process will never catch up.

The key-value store also supports durability. Each trunk will have a mirrored backup file in multiply remote servers to ensure high availability. Each operation on the trunk memory space, including defragmentation will mark the modified region of memory space in a skiped list buffer as dirty, order by the memory address offset to the operation. After certain interval of time, a daemon will flush all of the dirty data and their address to remote backup servers. Backup servers will take those data in a buffer and a daemon will take data from buffer and write then to disk asynchronously. With such defragmentation approach, each defragment operation will also leads to updating all of the copied cells. That is a huge io performance issue.

Then I read the paper from Stanford to find more feasible solution. The author provided full details on log-structured memory management. The main feature of this approach is to make memory space into append only segments. In cleaning process, the cleaner will make full empty segments for reuse. Each operation, including add, delete and updates are all appended logs. The system maintains a hash table to locate cells in segments. The cleaner is much complex than defragmentation in my key-value store, but it provides possibility to minimize  memory copy and parallel cleanning. The downside is the maximum size of each cell is limited by segment size by 1MB when my current approach can provide maximum 2GB for each cell (even it is not possible to have such big cells) and maintaining segments costs memory, especially when the memory space is large, the number of segments increases accordingly. I prefer to give the log-structured approach a try because defragmentation will become major bottleneck in current design, and large object is not essential in current use cases.

Manage memory is far more complex that I was thought in the first time. Other system like JVM and other programming languages took years on improving their GC, each improvements may reduce pause by 1 ~ 2x. I cannot tell which is the best solution without proper tests because each solution seems specified for certain use cases. I think I can do more research on this topic in the future.