Deltas

← Previous post Next post →

Deltas

Create, update and delete documents in the EmoDB System of Record by creating deltas.

Introduction

At the core, every document in the System of Record is stored as a sequence of delta operations. An Emo delta is a JSON specification that defines incremental updates to any JSON document. Writers simply append a delta (even for deletes). On every read, the deltas are evaluated in the order they were written to construct the current version of the document.

EmoDB has no built-in support for document-level locks, but allows for Optimistic Concurrency Control (OCC) for operations such as read-modify-write by leveraging conditional deltas. See more in the Guarantees page. Eventual consistency allows EmoDB to work well across multiple data centers. Writers express their intent using “deltas,” where deltas always converge to a consistent state.

For example, consider a typical review submission process:

  1. The submits a review. The system writes with the initial review content by creating delta t1:

    {
      "product": "Sceptre 32\" LCD 720p",
      "rating": 5,
      "text": "Very nice TV great picture. Very Very light amazing!",
      "contributor": "zkyle"
    }
    
  2. In Data Center A, the moderation system modifies the review by marking it approved by writing delta t2:

    {
      .., "status": "APPROVED"
    }
    
  3. In Data Center B, the submission server follows up with new information collected on the thank you page by writing delta t3:

    {
      .., "facebookId": 387075234674416
    }
    

Initially, a reader in Data Center A may only have two deltas visible: t1 and t2. They’re sorted in time order and evaluated to produce a moderated review. The reader merges t1 and t2 and sees the following:

    {
      "product": "Sceptre 32\" LCD 720p",
      "rating": 5,
      "text": "Very nice TV great picture. Very Very light amazing!",
      "contributor": "zkyle",
      "status": "APPROVED"
    }

Simultaneously, a reader in Data Center B may only be able to see deltas t1 and t3. They’re also sorted in time order and evaluated to produce a final object. The reader merges t1 and t3 and sees the following:

    {
      "product": "Sceptre 32\" LCD 720p",
      "rating": 5,
      "text": "Very nice TV great picture. Very Very light amazing!",
      "contributor": "zkyle",
      "facebookId": 387075234674416
    }

It isn’t until some time later (usually a few seconds, but if there are network issues between data centers, maybe a day or two later) that all three deltas get replicated to all data centers. At that point, any reader will see all three deltas merged together:

    {
      "product": "Sceptre 32\" LCD 720p",
      "rating": 5,
      "text": "Very nice TV great picture. Very Very light amazing!",
      "contributor": "zkyle",
      "status": "APPROVED",
      "facebookId": 387075234674416
    }

Notice that, in some sense, there is a replication conflict between t2 and t3 since both deltas modify the same object. But because each delta specifies only the fields it modifies, the deltas merge together cleanly and produce the desired result.

Also notice that multiple data centers were able to concurrently modify the same review without cross-data center synchronous communication.

Object Model

The core EmoDB object model is based on JSON. It uses all the same types as JSON:

  • null
  • boolean (true, false)
  • number (32-bit int, 64-bit long, 64-bit double except NaN, Inf)
  • string (UTF-8)
  • array (List<Object>)
  • object (Map<String, Object>)

Dates should be modeled as one of the types above, preferably as ISO 8601 strings (such as with Joda Time and org.joda.time.format.ISODateTimeFormat). If you’re using Jackson, call jsonFactory.disable(SerializationConfig.Feature.WRITE_DATES_AS_TIMESTAMPS) to serialize dates as ISO 8601 strings.

The top-level object in a System of Record document is always a JSON object, not one of the other types. The System of Record adds several “intrinsic” fields to every object:

  • ~id - The content key of the document in the System of Record.
  • ~table - The table the document belongs to.
  • ~deleted - true If the document does not exist or has been explicitly deleted, false otherwise.
  • ~version - A data center-specific version number that counts the number of times the document has been updated. This monotonically increasing number can be used to tell if a particular version of an object is newer or older than another version from the same data center.
  • ~signature - A 128-bit cryptographic hash of the sequence of time UUIDs of the updates to this object. You can implement a read-modify-write transaction by reading an object then writing back to the object with a conditional delta that checks that ~signature has not changed since the object was read.
  • ~firstUpdateAt - An ISO 8601-format timestamp string extracted from the time UUID of the first delta written to the object. Note that deleting and re-creating the object does not reset the value of ~firstUpdateAt. This entry will be omitted if the document has never been written to (ie. ~version is 0).
  • ~lastUpdateAt - An ISO 8601-format timestamp string extracted from the time UUID of the last delta written to the object. This entry will be omitted if the document has never been written to (ie. ~version is 0). Note that ~lastUpdateAt is not a reliable way to compare two versions of an object: if data center A writes updates t1 and t3 and data center B writes update t2, a reader in data center A may read one version of the object consisting of [t1, t3] then later read a second version of the object consisting of [t1, t2, t3]. Both versions will have the same ~lastUpdateAt, but they will have different values of ~version and ~signature.
  • ~lastMutateAt - An ISO 8601-format timestamp string extracted from the time UUID of the last delta written to the object which changed the document content (excluding intrinsics). For example, if the latest delta is {..,"available":true} and the prior state of the document already contained the value “available” set to true then the ~lastUpdateAt would be updated but the ~lastMutateAt would retain the timestamp from the previous delta. For the same reasons as ~lastUpdateAt ~lastMutateAt is also unreliable for comparing object versions. In fact, under the right circumstances replication delays between data centers can actually cause the ~lastMutateAt value to move backwards if the same updates are applied in each data center.
  • Plus all map entries specified in the table template when the System of Record table was created.

The Java System of Record API accepts the following as JSON objects:

  • null
  • java.lang.Boolean
  • java.lang.Byte (will be converted to Integer)
  • java.lang.Short (will be converted to Integer)
  • java.lang.Integer
  • java.lang.Long (will be converted to Integer if between Integer.MIN_VALUE and MAX_VALUE)
  • java.lang.Float (will be converted to Double)
  • java.lang.Double
  • java.lang.BigInteger (will be converted to Integer, Long or Double depending on its scale)
  • java.lang.String
  • java.util.List<?> (objects in the list must also be valid JSON objects)
  • java.util.Map<String, ?> (keys must be non-null strings, values must be valid JSON objects, null values are allowed)

Passing other Java types to methods that except JSON will trigger a com.bazaarvoice.emodb.sor.delta.JsonValidationException. Note that although java.util.Set is not considered a valid JSON type it is possible to perform set operations on arrays using java.util.List More on this below.

Deltas

EmoDB deltas use a recursive, pattern matching approach. There are delta operations for setting a value, deleting a value, and updating the value of a key in a map, where the map value update is itself a delta operation.

Note that there are no operations for modifying a list other than completely replacing or deleting it. It is difficult to design list update operations that resolve cleanly when there are write conflicts. However, if you need to allow multiple actors to concurrently modify a list there are a few alternatives. If the list is order-agnostic and doesn’t need to track duplicates then it can be modeled as a set. If a set is a poor representation then you can model the list using a map. Entries in a map are always identified deterministically by key, so concurrent attempts to add, modify or replace by key can usually be designed to resolve cleanly when there are write conflicts. If you don’t have an obvious candidate for the key, a time UUID (see TimeUUIDs.newUUID()) is often a good choice – they’re globally unique and have a good natural sort order (use TimeUUIDs.ordering(), see TimeUUIDs).

From Java, create deltas programmatically using the Deltas class. The following kinds of deltas are available:

Literal

Set the value to a specified JSON object, irrespective of the previous value.

When resolving replication conflicts, the literal delta implements strict last-writer-wins. For example, an ETL process which copies entire documents from a remote source into EmoDB typically writes each document in one complete delta as a literal.

A literal delta is sometimes called a “smash” operation.

The delta syntax has been designed so that literal deltas look exactly like their corresponding JSON.

Literal delta examples:

Java

Deltas.literal(ImmutableMap.<String, Object>)of("rating", 5, "text", "Love it!"))
Deltas.literal("APPROVED")
Deltas.literal(null)
Deltas.literal(3.5e10)

Delta Syntax

{"rating": 5, "text": "Love it!"}
"APPROVED"
null
3.5e10

Delete

Deletes an object or removes a key from a map.

Deleting an object:

Java

Delta.delete()

Delta Syntax

~

Removing a key from a map:

Java

Delta.mapBuilder().update("status", Deltas.delete()).build()
Delta.mapBuilder().remove("status").build()  // Equivalent

Delta Syntax

{..,"status":~}

Once a value has been deleted, it is considered “undefined.” The is(undefined) condition (see below) will evaluate to true. Deleting the top-level object, however, works differently–see the Object Model section above regarding intrinsics.

Map

Adds, removes, updates entries in a map. Optionally, delete a map if it’s empty after other operations are performed. Besides the literal delta, the map delta is the most commonly used type of delta.

The MapDeltaBuilder class contains a number of methods. Only a few are described, here.

Set the status field in a map without modifying other fields in the map:

Java

Deltas.mapBuilder().put("status", "APPROVED").build()

Delta Syntax

{..,"status":"APPROVED"}

Set the status field in a map if it hasn’t been set already:

Java

Deltas.mapBuilder().putIfAbsent("status", "APPROVED").build()

Delta Syntax

{..,"status":if ~ then "APPROVED" end}

Remove the status field from a map:

Java

Deltas.mapBuilder().remove("status").build()

Delta Syntax

{..,"status":~}

Remove the status field from a map if it equals APPROVED:

Java

Deltas.mapBuilder().remove("status", "APPROVED").build()

Delta Syntax

{..,"status":if "APPROVED" then ~ end}

Add a new nested object:

Java

String photoKey = TimeUUIDs.newUUID().toString();
Deltas.mapBuilder()
        .put(photoKey, ImmutableMap.of("url", "http://example.com/1234.jpg")
        .build()

Delta Syntax

{..,"82507710-bca6-11e1-87ef-001c42000009":{"url":"http://example.com/1234.jpg"}}

Set the status field on a nested object, but don’t create the nested object if someone else deleted it:

Java

Deltas.mapBuilder()
    .updateIfExists(photoKey, Deltas.mapBuilder().put("status", "APPROVED").build())
    .build()

Delta Syntax

{..,"82507710-bca6-11e1-87ef-001c42000009":if + then {..,"status":"APPROVED"} end}

Set

Adds and removes values in a set. Optionally, delete a set if it’s empty after other operations are performed. Unlike maps, sets can only contain literal values, such as numbers, strings, booleans, lists, and maps. However, it is not required that a set is homogeneous; a set can contain any combination of unique literals.

Note that JSON does not have a representation for sets. To work around this EmoDB can operate on arrays as sets by applying set delta operations to arrays. Because of this there is no definitive way to determine if an array returned by EmoDB originated as a array literal or a set. However, in most cases the writers know whether attributes are arrays or sets and the readers don’t care, so this ambiguity is typically not be a concern.

For example, the following demonstrates how a set created using set deltas is resolved as an array:

Java

Deltas.mapBuilder()
        .update("codes", Deltas.setBuilder().addAll(501, 789).build())
        .build()

// ... resolves to a Java object equivalent to...

ImmutableMap.of("codes", ImmutableList.of(501, 789))

Delta Syntax

{..,"codes":(..,501,789)}

... resolves to the JSON string...

{"codes":[501,789]}

The SetDeltaBuilder class contains a number of methods. Only a few are described, here.

Add the value top10 to a set without modifying other values in the set:

Java

Deltas.setBuilder().add("top10").build()

Delta Syntax

(..,"top10")

Replace any existing value with the set of (200, 204):

Java

Deltas.setBuilder().addAll(200, 204).removeRest().build()

Delta Syntax

(200,204)

Remove the value top10 from a set without modifying other values in the set:

Java

Deltas.setBuilder().remove("top10").build()

Delta Syntax

(..,~"top10")

Conditional

Performs a delta conditionally based on whether a condition is still true at the time the delta is evaluated (which, remember, is every time the object is read). Conditional deltas are essential for making changes that resolve cleanly when multiple writers are concurrently updating an object. One actor can read an object then write an update where the update is conditioned upon the object being unchanged since the read.

Conditional deltas take the form of if <condition> then <delta> elif <condition> then <delta> else <delta> end where the elif and else parts are optional and elifs may be chained together as long as desired.

See the “Conditions” section below for the available condition tests.

Mark a review as APPROVED but only if new content hasn’t been submitted since moderation begun (assumes the submission app sets submissionTxId to a random hash every time new updates are submitted by the end user):

Java

Deltas.conditional(
        Conditions.mapBuilder()
                .contains("submissionTxId", "1d67813cd2329e30dbb58aa9d7c901a1")
                .build(),
        Deltas.mapBuilder()
                .put("status", "APPROVED")
                .build())

Delta Syntax

if {..,"submissionTxId":"1d67813cd2329e30dbb58aa9d7c901a1"} then {..,"status":"APPROVED"} end

Noop

Makes no change. The noop delta is used internally by the conditional delta and map delta builders.

Java

Deltas.noop()

Delta Syntax

..

Conditions

Deltas or parts of deltas may be applied conditionally, based on the current state of the JSON document.

Conditional deltas are not designed to support arbitrary transformations of JSON documents in the System of Record. Instead, they are designed to help resolve the most common concurrent write conflict situations. This keeps the system simple and reliable.

Equal

Compare against the current JSON object for equality against a specified JSON literal. Maps and lists are compared recursively. Order of keys is irrelevent for maps. Order of entries in lists is significant. Floating point values must match exactly (ie. Java Double.equals()). Comparisons are null-safe.

Note that this operation is not appropriate for sets since sets are represented as lists and therefore order-dependent. To compare two sets for equality use containsOnly instead.

Java

Conditions.equal(null)
Conditions.equal(5)
Conditions.equal("APPROVED")
Conditions.equal(ImmutableMap.<String,Object>of("rating", 5, "text", "Love it!")

Delta Condition Syntax

null
5
"APPROVED"
{"rating":5,"text":"Love it!"}

Intrinsic

Compare an intrinsic field such as ~deleted against a specified JSON literal. Note that intrinsic values are not otherwise considered part of the JSON delta object model and other conditional operations will ignore them.

The ~table intrinsic is often useful in Databus subscription filters.

Java

Conditions.intrinsic(Intrinsic.ID, "abc123")
Conditions.intrinsic(Intrinsic.TABLE, "review:testcustomer")
Conditions.intrinsic(Intrinsic.DELETED, true)
Conditions.intrinsic(Intrinsic.FIRST_UPDATE_AT, "2012-06-22T20:11:53.473Z")
Conditions.intrinsic(Intrinsic.LAST_UPDATE_AT, "2012-06-22T20:12:09.679Z")

Delta Condition Syntax

intrinsic("~id":"abc123")
intrinsic("~table":"review:testcustomer")
intrinsic("~deleted":true)
intrinsic("~firstUpdateAt":"2012-06-22T20:11:53.473Z")
intrinsic("~lastUpdateAt":"2012-06-22T20:12:09.679Z")

Is

Performs a type check against the current JSON object.

A deleted object is considered “undefined.” Anything else is considered “defined.”

There is no test that distinguishes between integer and floating point numbers. Because floating point numbers may be converted implicitly to ints or longs based on whether their formatted JSON representation requires a decimal point, it’s usually not safe to condition any deltas based on integer vs. floating point.

As previously noted sets are represented as arrays in JSON, so testing if an object is a set is done by performing a list or array check.

Java

Conditions.isUndefined()
Conditions.isDefined()
Conditions.isBoolean()
Conditions.isNumber()
Conditions.isString()
Conditions.isList()
Conditions.isMap()

Delta Condition Syntax

is(undefined)
is(defined)
is(bool)
is(num)
is(string)
is(array)
is(object)

Note that the Java API uses Java-centric isList and isMap names for those tests but the string syntax uses the JSON-oriented terms array and object.

Comparison

Compares a numeric or string field in the current JSON to a constant value. There are four options for comparison: greater than (“gt”), greater than or equal (“ge”), less than (“lt”) and less than or equal (“le”).

Java

Conditions.gt(5)
Conditions.ge(98.6)
Conditions.lt("2014-01-28T11:21:41.058Z")
Conditions.le(50)

Delta Condition Syntax

gt(5)
ge(98.6)
lt("2014-01-28T11:21:41.058Z")
le(50)

Note that all four variations will evaluate to false if any of the following conditions are true:

  1. The current JSON object’s value is not defined.
  2. The current JSON object’s value is null.
  3. The current JSON object’s value is a string and the constant is numeric, or vice versa.
  4. The current JSON object’s value is not one of the supported types (for example, a JSON object or array).

Contains

Tests whether a list or set contains one or more values. Always returns false if the current JSON object is neither a list nor a set. There are three variations on containment testing: “any” returns true if the object contains at least one value, “all” returns true if the object contains all values, and “only” returns true if the object contains exactly all values with no additional values. contains(value) is a shortcut for containsAll(value).

Java

Conditions.contains(18)
Conditions.containsAll("faster", "cheaper")
Conditions.containsAny("med", "large", "x-large")
Conditions.containsOnly("short", "sweet")

Delta Condition Syntax

contains(18)
containsAll("faster","cheaper")
containsAny("med","large","x-large")
containsOnly("short","sweet")

Like

Performs a wildcard string check against the current JSON object. If the object is not a String then the result is always false.

Java

Conditions.like("review:*")         // Would match "review:testclient"
Conditions.like("*:testclient")     // Would match "review:testclient"
Conditions.like("*escaped\\*")      // Would match "example_of_escaped*"

Delta Condition Syntax

like("review:*")
like("*:testclient")
like("*escaped\\*")

Map

Tests specific keys in a JSON map with specified conditions. If multiple keys are specified, all conditions must be satisfied for the condition to be true (ie. conditions are ANDed together). Keys not mentioned are ignored.

Java

Conditions.mapBuilder().contains("type", "review").build()

Delta Condition Syntax

{..,"type":"review"}

Test whether a map contains the submissionTxId key:

Java

Conditions.mapBuilder().containsKey("submissionTxId").build()

Delta Condition Syntax

{..,"submissionTxId":+}

Java

Conditions.mapBuilder()
        .matches("photo", Conditions.mapBuilder().contains("status", "APPROVED").build())
        .build()

Delta Condition Syntax

{..,"photo":{..,"status":"APPROVED"}}

And, Or

A conjunction or disjunction of multiple condition tests.

Java

Conditions.and(Conditions.isDefined(), Conditions.isMap())
Conditions.or(Conditions.isList(), Conditions.isMap())

Delta Condition Syntax

and(+,is(map))
or(is(list),is(map))

There is a Java shorthand for an or of multiple equality tests:

Java

Conditions.in("APPROVED", "REJECTED")
Conditions.or(Conditions.equal("APPROVED"), Conditions.equal("REJECTED"))

Delta Condition Syntax

or("APPROVED","REJECTED")

Note that it’s not usually necessary to use and() to test multiple keys in a map. For example, the following Databus subscription table filter will subscribe to events on all tables containing TestCustomer products or categories:

Delta Condition Syntax

{..,"type":or("product","category"),"client":"TestCustomer"}

Not

Inverts the result of a condition test.

Java

Conditions.not(Conditions.equal("APPROVED"))

Delta Condition Syntax

not("APPROVED")

Constant

Always true.

Java

Conditions.alwaysTrue()

Delta Condition Syntax

alwaysTrue()

Always false.

Java

Conditions.alwaysFalse()

Delta Condition Syntax

alwaysFalse()

Note that the delta condition syntax true and false correspond to Conditions.equal(true) and Conditions.equal(false), respectively.

Read-Modify-Write

Fundamentally, conditional deltas plus the databus come very close to supporting a read-modify-write style of update.

  1. Read the original state of the document.
  2. Compute a new version of the document.
  3. Write a conditional delta, conditioned on the original state of the document.
  4. Either the write succeeds, or eventually the write conflicts with another write, the conditional write fails, and a databus event fires for the conflicting write. When that happens, a databus listener can detect that the conditional write failed and retry the update.

Note that there may be a period of time where the conditional write appears to have succeeded because the conflicting write (perhaps from another data center) is not yet visible. Consider the following:

  1. Application writes a literal delta t1.
  2. Application in data center A writes a conditional delta t3.
  3. Concurrently, application in data center B writes delta t2 that invalidates the conditional delta t3.
  4. For a while, readers in data center A will only see t1 and t3 and think that the conditional write succeeded.
  5. However, once t2 replicates to data center A, readers will now see t1, t2, t3 in sequence and the effect of evaluating t3 will change, invalidating the conditional write.

Because of this, it can appear that the System of Record may go back and retroactively fail a conditional write. However, the databus will guarantee to fire an event whenever this happens so apps can have a chance to detect it and retry the write.

Compaction

To make sure that reads do not get prohibitively inefficient due to high number of updates that need to be resolved at read time, EmoDB combines together older deltas and replaces them with a single delta equal to their combined effect, thus making the reads go faster.

However, there is a major challenge that comes with compaction in a multi-data center cluster. When is it ok to compact deltas for a given row in a data center? Specifically, what if an older delta arrives after we are done compacting? If we arbitrarily decide to compact deltas, say every five minutes, then we run the risk of losing deltas that may still be in flight from a different data center.

To solve this issue, EmoDB computes the Full Consistency Timestamp (FCT), which is defined as time t before which all deltas are fully consistent on all nodes. Said another way, there will be no deltas arriving from other data centers with timestamps before the FCT.

Any delta prior to FCT gets compacted without any fear of data loss. A special “compaction record” is created that tracks compaction activity and ensures that intrinsics like ~version, ~firstUpdateAt etc. are maintained correctly.

The DataStore.getTimeline() method will return the set of deltas currently stored in the System of Record. At first glance, it may look like you can use this to see all the individual deltas that were ever written for a document. But remember that, eventually, old deltas will be compacted together. So you can’t rely on the System of Record to keep a complete history of the life of a document. Only audit records are never compacted. If there is historical information you must keep, you can store it in an audit record. By default, EmoDB does keep a history of the past two days of compacted deltas. This value can be tweaked or disabled by configuring historyTtl property accordingly.

The compaction process is done opportunistically whenever documents are read from the System of Record. If a document is only written and never read, it will never be compacted.