Transaction In Doubt

My notes on Software Engineering

Tag: software engineering

Rete Meets Redis

In my previous post I briefly touched on the subject of Rules and the work I published in NPM as ‘durable’, which source is available under http://www.github.com/jruizgit/rules. In this post I explore in detail how to use the Rete algorithm and the Redis server as a powerful combination for processing real time events. A little bit of background first: The Rete algorithm was created a few decades ago and has been used in Inference based Expert Systems for efficient execution of business rules. Redis is a data-structure server, currently  used in a large variety of scalable applications.

Ruleset Definition

A ruleset is a set of conditions followed by actions. In ‘durable’ I use rulesets to express conditions about events, that is, things that happen. Let’s illustrate this with a very simple and downright simplistic example. Imagine we are rating the effectiveness of a web page, and we establish the following rules:

  1. As soon as a visitor hits our web page, we will start a timer of 5 minutes (assuming the visitor is now reading).
  2. When our visitor clicks on a link within the 5 minute timeout, we are going to increase the page score (yes the page worked!)
  3. When the timeout expires and our visitor did not click on any link, we are going to decrease the page score (the page was abandoned)

I now translate the rules above to the simple JSON abstraction used in ‘durable’ for expressing rules. Don’t worry if you don’t understand all the details as long as you see how it relates to the description above, you should be able to follow. If you are curious about the rules definition abstraction please visit https://github.com/jruizgit/rules/blob/master/concepts.md.

rating: {
    r1: {
        when: { event: ’start' },
        run: function(s) {
            s.state = ‘reading’;
            s.startTimer(’timeout’, 300);
        }
    },
    r2: {
        whenAll: {
            $m: { event: ’click’  },
            $s: { state: ‘reading’ }
        },
        run: function(s) { increaseScore(); }
    },
    r3: {
        whenAll: {
            $m: { $t: ’timeout’ },
            $s: { state: ‘reading’ }
        },
        run: function(s) { decreaseScore(); }
    }
}

The Rete Tree

When running the ruleset above, the first thing the ‘durable’ runtime is going to do is create a Rete tree as shown below:

Rating Ruleset Rete Tree

In a Rete tree there are three different kinds of nodes:

  1. The solid line circles in the diagram above are called alpha nodes. They have one input and can have several outputs. Their main task is routing data into beta or action nodes. They are associated with an assertion about the input. When the assertion is met, the data is pushed into all children nodes.
  2. The dashed line circles are known as beta nodes. They have two or more inputs and one output. They are in charge of doing join operations. That is, pushing data into action nodes when all input criteria has been met.
  3. The squares represent actions. Actions do real work (such as sending email or triggering other events), they can change the state of the tree, negating previous state and asserting new state.

The two top level alpha nodes represent the type of assertions that can be made. In the case of durable there are always two top level nodes: one for event messages and another for user state changes.

Forward Chaining

Now it is time to consider how an event sequence works. Please pay special attention to step 2.iv. Remembering facts about rules antecedent state is the key for high performance rule evaluation. When the event in step 3 happens, the system doesn’t need to reevaluate the entire ruleset and it does not need to deserialize the user state.

1. Let’s start by posting the following message:

{
    id: 1,
    event: ‘start'
}
i. The message comes through the REST handler and is posted to the ruleset’s Rete tree.
ii. The message is pushed to the alpha node `type = ‘$m’`. The condition is matched, so it is now pushed to the next alpha node.
iii. The message meets the condition `$m.event = ‘start’`. It is then pushed to the action node.
iv. An action item is added to the action queue of the ruleset. The REST handler response is sent to back to the user at this point.

2. The action is picked up by the background dispatch loop:

i. The action is executed, the state is changed to `s.state = ‘reading’`.
ii. The new state is pushed to the alpha node `type = ‘$s’`. The condition is matched. The state is pushed to the next alpha node.
iii. The state meets the condition `s.state = ‘reading’`, so it is pushed to the beta node.
iv. The ‘all’ join in the beta node is not complete yet. This input is remembered (this fact is stored in memory).

3. Now we assume the user clicked on a link in the page and as a result the following message is posted:

{
    id: 2,
    event: ‘click'
}
i. The request comes through the REST handler and is posted to the ruleset’s Rete tree.
ii. The message is pushed to the alpha node `type = ‘$m’`, which pushes the message to the next alpha node.
iii. The message meets the condition `$m.subject = ‘approved’` and is pushed to the beta node.
iv. The ‘all’ join is now satisfied, so the message is pushed to the action node.
v. Now an action is added to the action queue of the ruleset. The REST handler response is sent to back to the user at this point.

4. The action is picked up by the background dispatch loop.

i. The all join state is deleted.
ii. The action is executed.

Tree State in Redis

In the previous section you might have noticed I glossed over the Rete tree memory management details (points 2.iv and 3.iv). In order to scale out, that is, to be able to use several cores to evaluate the ruleset, it is necessary to store the tree state in a cache external to the processes. Managing the beta nodes state needs careful consideration. Let’s look at a first approach to handling step 2.iv and 3.iv above:

Get the beta node state from the cache
If the ‘all' join is not complete
    Store the fact that one beta node side is complete
Else
    Delete the beta node state from the cache
    Add an item to the action queue

The pseudo code above has two fatal flaws: 1. A race condition when checking and storing the beta node state can lead to losing event messages 2. Failure after deleting the beta node state can lead to dropping the action. To fix both problems we need to use a lock and the code needs to be idempotent, so it can be retried in case of failure.

Acquire a lock with a timeout (in case of failure)
Get the beta node state from the cache
If the ‘all' join is not complete
    Store the fact that one beta node side is complete
Else
    Add an item to the action queue if it has not been added
    Delete the beta node state from the cache
Release the lock

Now the code above requires up to 5 cache accesses. In addition, recovering from a failure will delay waiting for the lock to timeout. In order to achieve real-time event processing performance only one memory cache access can be afforded for each relevant event (I came to this conclusion after studying the performance profiles from my benchmarks). There has to be a better way of doing this.

Redis offers features such as sorted sets, single threaded multi command execution and Lua scripting, which can be used to efficiently address the problem above. The Forward Chaining algorithm has to be modified for efficient memory management and join evaluation. Specifically: 1. Push the beta node state all the way down to the action nodes 2. The action nodes now are in charge of storing the information in the Redis, evaluating upstream joins and enqueuing actions in a single multi execute sequence.To illustrate this point, for the example above, the following sets are created to store the join information:

  • rating.r1.$m
  • rating.r2.$s.$s
  • rating.r2.$m.$m
  • rating.r3.$s.$s
  • rating.r3.$m.$m

The join evaluation works as follows:

Multi execute sequence for step 2.iv.
    Add an entry identifying the user state to rating.r2.$s.$s.
    Join by testing if rating.r2.$s.$s and rating.r2.$m.$m have an entry.
Multi execute sequence for step 3.iv.
    Add an entry identifying the event message to rating.r2.$m.$m.
    Join by testing if rating.r2.$s.$s and rating.r2.$m.$m have an entry.
    Add action entry to the action queue.
    Remove the entries in rating.r2.$m.$m and rating.r2.$s.$s.

Please note multi execute is run as one single threaded set of instructions in the Redis server. The use of sets ensure the idempotency of operations (it can be retried in case of failure).

Inference: From Expert Systems to Cloud Scale Event Processing

For a few years now I have been interested in event processing, tracking and analyzing information about things that happen. Not only because time and again I have stumbled upon problems that can easily be modeled with events, but because I have come to believe proper event modeling, just as data modeling, is one of the fundamental pillars of distributed application design and development.  E-commerce, supply-chain-management, financial, social and health-care applications, are some of the domains where event processing is interesting.

There are a number of high end commercial products specifically tailored for event processing. Despite very strong theoretical underpinnings, in my opinion, many of them suffer from fundamental problems:

  • Complex abstraction: Very powerful abstractions can cover a large number of cases. The learning curve, however, is steep. In addition the abstraction is not well integrated with most popular languages and tooling.
  • Type system impedance: Event rules and event data are defined in a custom type system. User state typically handled separately using relational models. As a result applications spend a large amount of cycles transforming data.
  • Events and State storage: Because of the type system impedance, events and state are often stored separately, which leads to increased network access and complex distributed coordination.
  • Difficult to manage: Monolithic servers cannot readily be deployed to the cloud, as they require special network and management arrangements.

First Round

About a year ago, given the rapid developments in cloud technologies and the trend towards enterprise software democratization. I decided to invest my personal time in researching this area and sharing the results with anyone interested. After a few months of intense creativity, learning, and coding I published the results in NPM under the name ‘durable’ version 0.0.x, git repository http://www.github.com/jruizgit/durable.

Important learnings to report:

  • The best place for writing code is the airplane, preferable overseas roundtrip to\from Shanghai. Indeed: no email, no messages, no phone, 12-14 hours of uninterrupted coding! Unsurpassable remedy for the jet-lag when flying out of Seattle around noon. Getting back to Seattle at 8AM is a little rough, given you need to start the day after staring at the computer for more than 10 hours.
  • A new exciting project is a perfect way to justify to your partner the purchase of new equipment (in my case a beautiful MacBook pro). I must say: retina display has kept me motivated, as I look forward to working on my project at least a few minutes every day just because the editor, the browser and the terminal look so clear and beautiful.

On a more serious note: From the beginning of the project I established a few core principles worth enumerating and carrying along in future iterations.

  • JSON type system: Event information and user state are defined stored and queried using the JSON type system.
  • Storage: Events and user state are stored in the same place. This eliminates unnecessary network access and distributed consistency coordination.
  • REST API: Events are raised through a REST API. This allows for easy integration with different event sources.
  • Fault tolerance: Reactions to events are guaranteed to be executed at least once. The tradeoff is reactive functions need to be idempotent.
  • Cloud ready: Leverage technologies that can easily be hosted and managed in the cloud. Node.js, D3.js and MongoDb in this case.

I was very happy with the results of my work for about a week. Then disaster! The code got old, it started to rot and had to be rewritten (yes I have worked as a manager for a long time, however I’m still a developer and I can’t help this). A couple of fundamental problems made me feel uncomfortable with my recent work:

  • Meta-linguistic abstraction: It had many concepts, which made it complex. It heavily relied on the ‘Promise’ concept, which did not compose well with Node.js ecosystem. It inherited the MongoDb expression language with its faults and limitations. For example, a logical ‘and’ was defined using a JSON object, which implies no ordering.
  • Message brokering:  Events were stored as messages in a queue. A background daemon periodically fetched and correlated messages with user state. As a result I could not bring the performance of the system quite where I wanted.

Note: I will describe the benchmark in future posts, suffice it to say: in a quad core, 16 GB IMac I got 700 event-action-event cycles/sec

Second Round

September last year, I had heard a number of people allude to the importance and relevance of business rules. I was familiar with this domain, but I had always considered it a solution for a very specific problem. With some skepticism, I spent a few days reading documents on inference, forward chaining and Rete algorithms. It occurred to me that inference could help improve the performance of ‘durable’ at least by an order of magnitude. So I decided to start a new code iteration by implementing a Rete algorithm, which could scale out simply by adding commodity hardware. The published papers on forward chaining only consider an in memory single process environment, without problems of consistency nor cost of memory access. So my main area of focus became the use of  cache technology for storing inference state. At the end I decided to use Redis because it is fast, it offers powerful data structures (hashsets and ordered hashsets), server side scripting and, believe it or not, its single threaded model is great for handling concurrency and consistency.

On top of the basic principles I had established in the first iteration (JSON, storage, REST API, Fault Tolerance and Cloud Ready), I adopted 4 new principles:

  • Meta-linguistic abstraction: A reduced set of concepts provides an intuitive, minimalistic abstraction to ease the learning curve.
  • Rules: The basic building block of the system is the ‘Rule’ which is composed of an antecedent (expression) and a consequent (action). By allowing expressing rules over incoming events as well as user state, more complex constructs such as statecharts and flowcharts can be supported, because they can be reduced to a set of rules.
  • Forward chaining: Allows for quick event evaluation without the need for recomputing all expressions nor reading and deserializing user state. This the key for a significant performance boost over my previous implementation based on message brokering.
  • Caching: Inference state is stored in a out of proc cache, which allows scaling out rule evaluation beyond a single process using commodity hardware.

I just published my work to NPM under the name ‘durable’ version 0.10.x, git repository http://www.github.com/jruizgit/rules. So far I’m still happy with my work, but a few questions remain unanswered:

  • Can I improve the performance of the system even more by reimplementing it in C and cutting the cost of event marshaling to Redis?
  • Or will the next performance boost come from moving rule evaluation closer to the memory location?
  • Can I leverage a C implementation to support meta-linguistic abstractions for Python and Ruby?
  • Now that I’m not traveling to Shanghai as often, how can I find 12 hours of uninterrupted coding time?

Note: After more than a month of perf tuning, using the same benchmark as in First Round, with a quad core, 16 GB IMac I got 7500 event-action-event cycles/sec.