Promise = require 'bluebird'
AsyncPool = require 'async-pool'
label = ->
psbytes(9).then (bytes)->bytes.toString('base64')
class Deadlock
constructor: (@message) ->
@name = "Deadlock"
Error.captureStackTrace(this, Deadlock)
A local memory adapter for testing.
The adapter stores a hash of objects and a hash of transactions. Each transaction is another memory adapter. When an object is used in the transaction, the object is locked by having its key changed to point to the transaction rather than the object, and the object is put in the transactions objects.
When an object is set that is already in a transaction, the call blocks, and its promise is put in a queue to be released when that transaction continues.
When the transaction commits or rolls back, the queue is released.
Promise = require 'bluebird'
AsyncPool = require 'async-pool'
label = ->
psbytes(9).then (bytes)->bytes.toString('base64')
class Deadlock
constructor: (@message) ->
@name = "Deadlock"
Error.captureStackTrace(this, Deadlock)
Memory adapter represents both underlying key/value store, and a transaction.
Aside from transaction public interface is just get
and set
. A memory
adapter should be given a unique name. The
class MemoryAdapter
constructor: (@options, @insideOf)->
@name = options?.name ? 'memory'
@objects = {}
@nclients = @options?.nclients ? 2
@waiting = []
@transactionsWaiting = {}
@clients = AsyncPool (
new MemoryClient(this, i) for i in [1..@nclients])
Get the value for key
, or undefined
if it hasn’t been set.
If it is inside the transaction already, it may have been taken by a sub- transaction. If so, wait for it, and return when that commits.
If it is not in transaction check for it from super transaction, and return that value (after possibly having waited for it), including it in the transaction as well.
get: (key)->
current = @objects[key]
current = @objects[key]
if current instanceOf MemoryAdapter
return current._addWaiting {
op: 'set', transaction: this,
key: key
}
if @insideOf?
return insideOf.get(key).bind(this).then (value)->
@objects[key] = value
return value
return Promise.resolve().delay().then -> undefined
Set value
for key
.
If we are inside another transaction, lock key in that transaction first, then set in this transaction, possibly after waiting for subtransaction.
set: (key, value)->
Promise.resolve().bind(this).then ->
if @insideOf?
@insideOf.set(key, this)
.then ->
current = @objects[key]
if current instanceOf MemoryAdapter
return current._addWaiting {
op: 'set', transaction: this,
key: key, value: value
}
else
@objects[key] = value
return Promise.resolve().delay().then ->this
Add an operation waiting for this transaction to commit. Besides putting on the queue, we compile the complete list of transactions waiting and check if we aren’t waiting on ourselves indirectly. If we are, return a rejected promise (deadlock). Otherwise return promise that resolves to the result of the operation, to be triggered when we commit.
_addWaiting: (opHash)->
transaction = opHash.transaction
_deadlock = ->
Promise.throws new Deadlock(
"#{transaction.name} and #{@name} wait for each other")
if transaction.name == @name
return _deadlock()
for name of transaction.transactionsWaiting
if name == @name
_deadlock()
merge as separate pass so state clean if deadlock handled
for name of transaction.transactionsWaiting
@transactionsWaiting[name] = true
@waiting.push opHash
return Promise.resolve().delay().then ->this
_releaseWaiting: ()->
Promise.resolve(opHash).bind(this).map ({op, key, value, transaction})->
transaction[op](key, value)
.then ->
return this
getRawClient: ()->
return @clients.use()
openTransaction: (client, name)->
name ?= "#{@name}-#{client.name}"
@transactions.push = transaction = new MemoryAdapter({name: name})
client.transactions.push transaction
return Promise.resolve(this)
commitTransaction: (client)->
transaction = client.transactions.pop()
return Promise.resolve(this) if !transaction?
Promise.resolve(Object.keys(transaction)).bind(this).map (k)->
@set k, transaction[k]
.then ->
transaction._releaseWaiting()
rollbackTransaction: (client)->
transaction = client.transactions.pop()
return Promise.resolve(this) unless transaction?
return transaction._releaseWaiting()
openSubTransaction: (client, name)->
@openTransaction(client, name)
commitSubTransaction: (client, name)->
@commitTransaction(client, name)
rollbackSubTransaction: (client, name)->
@commitTransaction(client, name)
wrap: ()->
@get = (key)->
Promise.using xwrap.client(), (client)->
client.get(key)
@set = (key, value)->
Promise.using xwrap.client(), (client)->
client.set(key, value)
class MemoryClient
constructor: (@adapter, @n)->
@transactions = []
get: (key)->
transaction = @transactions.slice(-1)[0]
if transaction?
value = transaction.get(key)
set: (key, value)->
transaction = @transactions.slice(-1)[0] ? @adapater
transaction.set(key, value)
Build and return an adapter asynchronously.
initialize = (options)->
return Promise.resolve new MemoryAdapter(options)
exports.initialize = initialize