Transaction In Doubt

My notes on Software Engineering

Tag: stream computing

Rete_D

The past few months have been a time of intense study and creative software implementation. Which led to the full development of the durable_rules forward inference algorithm. Before I release what I consider the durable_rules beta version, I would like to present my thoughts on what I have done with the Rete algorithm. It is difficult to talk about an algorithm without naming it. That is why, in this note, I refer to durable_rules forward inference implementation as Rete_D (D for distributed).

As I read blogs and white papers on rules engines implementations, at least in principle, I found interesting similarities between Rete_D and Drools (ReteOO as well as PHREAK). In particular it is worth mentioning the features:

  • Alpha node indexing: Rete_D accomplishes by using a trie written in C (see my previous blog entry).
  • Beta node indexing: in Rete_D `join` and `not` nodes are indexed using a lua table.
  • Set oriented propagation:  Inserts and deletes are queued. During rule evaluation, inserts and deletes are evaluated against a set of frames which facts and events stored in redis.
  • Heap based agendas: The agenda for rule execution is managed with a redis sorted set and lists ordered by salience.

Rete_D has a number of important features, which make it unique. To demonstrate them I use a simple fraud detection rule written in Ruby.

Durable.ruleset :fraud do
  when_all c.first = m.t == "purchase",
                 c.second = (m.t == "purchase”) & (m.location != first.location) do
    puts "fraud detected " + first.location + " " + second.location
  end
end

Scripting

In the example above, you might have noticed there is no types nor models definition. Events and facts are JSON objects and rules are written for the JSON type system (sets of name value pairs). A good analogy is the MongoDB query language, in which the existence of JSON objects is assumed and an explicit structured schema definition is not needed. In fact in my earliest Rete_D implementation I used the MongoDB syntax for defining rules. I departed from MongoDB query because expression precedence when using operators such as And, Or, All and Any is ambiguous. In today’s durable_rules implementation our example will be translated to:

fraud: {
    all: [
        {first: {t: 'purchase'}},
        {second: {$and: [{t: 'purchase'}, {$neq: {location: {first: 'location'}}}]}}
     ],
}

Events and facts are fully propagated down the network, that is, there is no property extraction constructs in either the library nor the Rete_D structure. In a rule definition, events and facts have to be named and referenced by such a name when expressing correlations. Any property name can be used. The results:

  1. Simpler rule definitions. 
  2. Integration with scripting languages, that provide great support for JSON type system.

Distribution

With Rete_D, the tree evaluation can distributed among different compute units (call them processes or machines). There are three important reasons for doing so:

  1. Availability: avoid single points of failure.   
  2. Scalability: distribute the work utilizing resources available.
  3. Performance: consolidate event ingestion with real-time rule evaluation.

To make this viable, pushing evaluation as close to the data a possible is critical. Alpha nodes are evaluated by the Rete_D C library and can be distributed symmetrically across compute units (such as Heroku web dynos). The Beta nodes are evaluated by a lua script (created during rule registration) in the Redis servers where the state is kept. Events and facts can be evaluated in different Redis servers depending on the context they belong to. Finally actions are to be executed by the compute units responsible for alpha node evaluation.

trie

Events

Events are a first class citizen in the Rete_D data model. The difference between events and facts is events can only be seen once by any action. This principle implies that events can be deleted as soon as a rule is resolved and before the corresponding action is scheduled for dispatch. This can dramatically reduce the combinatorics of join evaluation and the amount of unnecessary computations for some scenarios.  

To illustrate this point, let’s add the following rule to the example above:


  when_start do
    assert :fraud1, {:id => 1, :sid => 1, :t => "purchase", :location => "US"}
    assert :fraud1, {:id => 2, :sid => 1, :t => "purchase", :location => "CA"}
    assert :fraud1, {:id => 3, :sid => 1, :t => "purchase", :location => "UK"}
    assert :fraud1, {:id => 4, :sid => 1, :t => "purchase", :location => "GE"}
    assert :fraud1, {:id => 5, :sid => 1, :t => "purchase", :location => "AU"}
    assert :fraud1, {:id => 6, :sid => 1, :t => "purchase", :location => "MX"}
    assert :fraud1, {:id => 7, :sid => 1, :t => "purchase", :location => "FR"}
    assert :fraud1, {:id => 8, :sid => 1, :t => "purchase", :location => "ES"}
    assert :fraud1, {:id => 9, :sid => 1, :t => "purchase", :location => "BR"}
    assert :fraud1, {:id => 10, :sid => 1, :t => "purchase", :location => "IT"}
  end

Because each fact matches the first and the second rule expression, there are 10 x 10 comparisons, which produce 10 x (10 -1) = 90 results. In contrast, let’s replace ‘when_start’ with the following rule:


  when_start do
    post :fraud1, {:id => 1, :sid => 1, :t => "purchase", :location => "US"}
    post :fraud1, {:id => 2, :sid => 1, :t => "purchase", :location => "CA"}
    post :fraud1, {:id => 3, :sid => 1, :t => "purchase", :location => "UK"}
    post :fraud1, {:id => 4, :sid => 1, :t => "purchase", :location => "GE"}
    post :fraud1, {:id => 5, :sid => 1, :t => "purchase", :location => "AU"}
    post :fraud1, {:id => 6, :sid => 1, :t => "purchase", :location => "MX"}
    post :fraud1, {:id => 7, :sid => 1, :t => "purchase", :location => "FR"}
    post :fraud1, {:id => 8, :sid => 1, :t => "purchase", :location => "ES"}
    post :fraud1, {:id => 9, :sid => 1, :t => "purchase", :location => "BR"}
    post :fraud1, {:id => 10, :sid => 1, :t => "purchase", :location => "IT"}
  end

Remember each event can only be see once, therefore there are 10 comparisons, which produce only 5 results. As you can see events can dramatically improve performance for some scenarios. 

Context References

It is common to provide rule configuration information by asserting facts. In some cases this might lead to increasing join combinatorics. Rete_D implements an eventually consistent state cache, which can be referenced during alpha node evaluation, thus reducing the beta evaluation load. Consider the rule implemented in Ruby:

Durable.ruleset :a8 do
  when_all m.amount > s.id(:global).min_amount do
    puts "a8 approved " + m.amount.to_s
  end
  when_start do
    patch_state :a8, {:sid => :global, :min_amount => 100}
    post :a8, {:id => 2, :sid => 1, :amount => 200}
  end
end

An event is compared against the ‘min_amount’ property of the context state object called ‘global’. This evaluation is done by an alpha node using the state cached in the compute unit and refreshed every two minutes (by default). Again this can lead to reduction in join combinatorics and significant performance improvements for some scenarios.

Conclusion

Rete_D provides a number of unique features, which I believe make it relevant for solving Streaming Analytics problems where rules are expected to handle a large amount of data in real time. The following features are not yet implemented in Rete_D. They represent exciting areas for future research.

  • Beta node sharing
  • Lazy rule evaluation
  • Rule based partitioning
  • Backward inference

Boosting Performance with C

Right after posting my last blog entry I began work on Durable Rules third iteration. My main question was: How can I improve the framework performance by an order of magnitude across all scenarios? As you can see it has been quite a long time since my last blog post. Well… I have been very busy finding an answer for the question above.

Early on, I decided to re-implement the core rules engine in C. Compared with scripting and memory managed languages, C provides good control over memory allocation and a lot flexibility on data structure definition. Obviously, control and flexibility come at a price: It took me a lot longer to develop the code.

So, in the end, this exercise was not just simply porting code from one language to another. I had to rethink and design the data structures. I also had to design and write a JSon parser. And I must say: I also took some time to invent a new feature to enable batch processing for streaming. The detailed explanation is in the following sections.

Data Structure Definition

The most important part of the project was to define a suitable data structure to represent a ruleset. The central principle: the system has to guarantee O(n) (linear) performance as a function of the input (event) size. To illustrate how this is achieved, let’s consider the simple rule definition below:

var d = require('durable');
d.run({
    rating: {
        track: {
            whenAll: {
                a: { event: ’start’ },
                b: { event: ‘end’ }
            },
            run: function(s) { }
        }   
    }
});

The code above waits for two events ‘start’ and ‘end’ to execute the ‘track’ action. Conceptually this rule is represented by a Rete tree, which root is an alpha node for message based events (type = $m). This node is followed by two alpha nodes with two rules event = ’start’ and event = ‘end’ respectively. The alpha nodes are joined by the ‘all’ beta node, which is followed by the track action node.

rete4

In my C implementation the Rete tree is a trie. Every time an event is observed, each event message property is evaluated in the trie. The conceptual counterpart of a trie node is an alpha node in the Rete tree. Thus each trie node contains the rule to be applied to a single named property. Also each trie node points to subsequent properties to be evaluated (typically derived from an ‘and’ expression). The leafs of the trie point to a linked list of beta nodes terminated by an action node.

trie

Very important: the trie node keeps the map of subsequent properties in a hash table. This is a slight deviation over typical trie nodes used in text context search, which keep a map of subsequent valid characters using an array. I decided to handle collisions in the hash table using a simple linear probing algorithm. With linear probing I get better memory usage and acceptable predictable near constant time performance as long as the load in the tables is kept low.

In order to increase memory locality and make better use of the processor cache, the trie structure is packaged in two arrays. One array has all the alpha, beta and action nodes. While a second array contains all the hash table buckets for the trie nodes. Nodes and hash tables never hold pointers to other nodes, only array offsets.

array

JSon parsing

To reduce the cost of data transformation I decided to use JSon as the event type system for this new Rules engine. Thus a JSon object could be passed directly from a Web Request input stream without any extra processing. C doesn’t have built in facilities for JSon parsing. So I wrote a lightweight JSon parser following a few strict principles:

  • Avoid Object Model (DOM) definition and buffering
  • Avoid using the heap, only use stack memory
  • Optimize for parsing key value pairs in a single pass
  • Calculate property name hashes while parsing

The JSon parser is really just a simple state machine tailored for the specific use in the rules engine. An important aside: I chose the DJB algorithm to hash property names because it is known to have good distribution and it added minimal overhead when parsing. The property name hash codes, as explained in the section above, are critical for the Rete tree ultra fast evaluation.

Batching

Compared with events which don’t match any rule, single positive event evaluation is expensive because it needs to be recorded in the Redis cache. In some cases it triggers the evaluation of a beta join and in other cases the queueing of an action. Batching helps optimizing the cost of all this activity and allows for processing large streams of data. Let’s consider the following snippet of code:

d.run({
    approval: {
        rule: {
            whenSome: { $and: [ { subject: 'approve’ }, { $lte: { amount: 1000 }}]},
            run: function(s) {}
        },
    }
}, '', null, function(host) {
    host.postBatch('approval', [{ id: '0', sid: 1, subject: 'approve', amount: 100 }, 
                                { id: '1', sid: 1, subject: 'approve', amount: 100 },
                                { id: '2', sid: 1, subject: 'approve', amount: 100 },
                                { id: '3', sid: 1, subject: 'approve', amount: 100 },
                                { id: '4', sid: 1, subject: 'approve', amount: 100 }]);
});

The ‘rule’ action will be able to process at least one event which matches the expression at the time of evaluation. The postBatch function allows clients to post an array of event messages for evaluation and dispatching.

Benchmarks

In order not to lose track of my main objective I constantly measured performance when developing the project. When talking about performance there is always a lot of confusion. So first I will explain the methodology I used for measuring, then I will present the results for three important tests.

In all benchmarks: I used the same equipment: IMac OS X 10.9.4, 3.4GHz i7, 16GB RAM 1333MGHz DDR3. I drove the CPU utilization to 90% across all cores by running the same test concurrently in as many node.js processes as needed. In addition I added as many Redis servers as required to prevent it from becoming a bottleneck. I defined the number of operations to be performed and I measured the time it took to perform them.’Throughput’ is: the number of operations divided by the time it took to perform them. ‘Cost’ is: elapsed time divided by the number of operations. In the notes below I use the unit ‘K’ to denote thousand and ‘M’ to denote million.

Reject Events Test

In this test I defined a ruleset with 30 rules and continuously asserted event messages, which did not match any rule. I ran several experiments with different event message sizes. I used two different modes single and batch events.  The objective was to measure the raw JSon parsing and the tree expression evaluation speed. Just as a reference, not as baseline, I ran the same experiments for the JScript JSON.Parse function.

negative

In batch mode durable rules is able to process a little less than 10M small messages (30B) every second, in single message mode it is 25% slower (7.5M messages every second). In both batch and single mode durable rules is able to process 1M larger messages (1KB) every second. Important note: the complexity of the algorithm is linear (O(n)) to the size of event messages. Interesting factoid: parsing the same messages in JScript (JSON.Parse) is also linear, but every byte takes more cycles to parse, indeed it can parse 10M small messages every second but only 0.25M large messages every second.

Accept Events Test

Again I defined a ruleset with 30 rules and continuously asserted event messages. This time all event messages matched a rule in the set and lead to queueing an action. I ran experiments with different message sizes and I tried two modes: single and batch. The objective was to measure redis caching and beta join calculation done in Redis scripts.

positive

In batch mode durable rules is able to process 250K small event messages (50B) every second, while it can process 60K single small event messages every second. It can process 120K and 40K large event messages (1KB) every second in batch and single mode respectively. Again, the algorithm is linear (O(n)) to the size of the event message.

Rete Cycle Test

Finally I tested the full Rete cycle. I defined a 30 rules ruleset, but in this case not only were the event messages accepted, but the actions dequeued, the messages removed and the new associated state re-asserted. All of this was done while keeping the system consistency guarantees.

full

In batch mode durable rules can run 100K small event messages (50B) through the Rete cycle, while in single mode it can run 18K. In the case of larger event messages (1KB), the results are 15K and 65K in single and batch mode respectively. The algorithm is linear (O(n)) to the size of the event message.

In conclusion the new C implementation provides a significant performance boost over my previous JScript based implementation. The new batch\streaming feature allows for flowing a lot more data through the system making a more efficient use of the Redis cache.

To learn more, please visit: http://www.github.com/jruizgit/rules.