DynamoDB is a non-blocking asynchronous storage, and it is great. Mostly. However, sometimes you just need to ensure that only one instance of script accesses a certain share of information in your table. In my case it was operation or purgin+rebuilding records, which under race conditions would result in a massive data loss.
So, I decided to build a semaphore. I inited the following schema:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
/** * Table 'spectron_semaphores' */ try { $db->describeTable([ 'TableName' => TBL_PREFIX . 'semaphores' ]); } catch (\Aws\DynamoDb\Exception\DynamoDbException $e) { if ($e->getAwsErrorCode() != 'ResourceNotFoundException') { trigger_error($e->getMessage()); } $db->createTable([ 'TableName' => TBL_PREFIX . 'semaphores', 'KeySchema' => [ [ 'AttributeName' => 'semaphoreName', 'KeyType' => 'HASH' ], ], 'AttributeDefinitions' => [ [ 'AttributeName' => 'semaphoreName', 'AttributeType' => 'S' ], [ 'AttributeName' => 'userId', 'AttributeType' => 'S' ], [ 'AttributeName' => 'expires', 'AttributeType' => 'N' ], ], 'GlobalSecondaryIndexes' => [ [ 'IndexName' => 'User', 'KeySchema' => [ [ 'AttributeName' => 'userId', 'KeyType' => 'HASH' ] ], 'Projection' => [ 'ProjectionType' => 'ALL', ], 'ProvisionedThroughput' => [ 'ReadCapacityUnits' => 1, 'WriteCapacityUnits' => 1, ] ], [ 'IndexName' => 'Expires', 'KeySchema' => [ [ 'AttributeName' => 'expires', 'KeyType' => 'HASH' ], ], 'Projection' => [ 'ProjectionType' => 'ALL', ], 'ProvisionedThroughput' => [ 'ReadCapacityUnits' => 1, 'WriteCapacityUnits' => 1, ] ] ], 'ProvisionedThroughput' => [ 'ReadCapacityUnits' => 1, 'WriteCapacityUnits' => 1 ] ]); } |
The basic framework in my case is Phalcon (you should try it, it is awesome). The Semaphore model has to be a singleton, or better a generator of singletons – I resorted to the latter:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
use Aws\DynamoDb\Exception\DynamoDbException; use Phalcon\Di; class Semaphore extends ModelBase { private $semaphoreName = NULL; private static $instances = []; /** * @param $name * * @return Semaphore */ public static function getInstance($name) { if (empty(self::$instances[$name])) { self::$instances[$name] = new self($name); } return self::$instances[$name]; } protected function __construct($name) { parent::__construct(); $this->tableName = TBL_PREFIX . 'semaphores'; $this->semaphoreName = $name; } public function lock() { $lockAcquired = FALSE; while (!$lockAcquired) { try { $this->insert([ 'semaphoreName' => ['S' => $this->semaphoreName], 'userId' => ['S' => Di::getDefault()->get('session')->user['key']], 'expires' => ['N' => (string) (time() + 120)], ], [ 'semaphoreName <> :semaphoreName or expires < :time', [ ':semaphoreName' => ['S' => $this->semaphoreName], ':time' => ['N' => (string) time()], ] ]); $lockAcquired = TRUE; } catch (DynamoDbException $e) { // If there is valid lock for the name if ('ConditionalCheckFailedException' == $e->getAwsErrorCode()) { // wait 5 seconds sleep(5); // Try again continue; } else { throw new \Exception($e->getMessage()); } } } } public function release() { $this->delete([ 'semaphoreName' => ['S' => $this->semaphoreName] ]); unset(self::$instances[$this->semaphoreName]); } public function __destruct() { $this->release(); } } |
Related parts of the ModelBase :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
public function insert($item, $condition = []) { $batch = [ 'TableName' => $this->tableName, 'Item' => $item, ]; if (!empty($condition)) { $batch['ConditionExpression'] = $condition[0]; $batch['ExpressionAttributeValues'] = $condition[1]; } return $this->db->putItem($batch); } public function delete($id) { $this->db->deleteItem([ 'TableName' => $this->tableName, 'Key' => $id ]); } |
I used official AWS framework’s DynamoDB client for PHP for further request processing.
Note two points here:
- The non-matching condition in putItem request throws a ConditionalCheckFailedException , so you should catch and process it. And since AWS DynamoDB client does not bother issuing different classes of exceptions, defining its own codes, you should be sure to get out of the cycle if any other exception occurs.
- The cloud version, unlice local version of DynamoDB does not like integers. Seriously, it forces you to typecast your integers to string, in order to store them as “number”.
So you can use this setup as follows:
1 2 3 4 5 6 7 8 9 10 |
public function myFunction($args) { ...... $semaphore = Semaphore::getInstance('tbl_issues:' . $projectId); $semaphore->lock(); [... potentially dangerous synchronous stuff ...] $semaphore->release(); } |