Recent Posts

    Authors

    Published

    Tag Cloud

    Reliably handling externally processed records in an highly concurrent environment.

    A three step approach to handle external processing of records.


    To reliably process records in a concurrent environment which interface with a external system (not included in the database transaction) we must adopt a "three step approach".

    The three steps are:-

    1. Take ownership of the record by marking as "processing" and prevent other processes from doing the same by using an optimistic locking strategy.

    2. Process the record itself in the external system (payment gateway, send email etc)

    3. Record the result of the processing in the database by marking the record status as complete or error.

    If a power outage or other extreme event occurs the only records in question are the ones marked as 'processing'.

    Sample processing of a record in a concurrent environment

            /**
             * Handling race conditions when processing record.
             *
             * Firstly this process need to take ownership of the record using
             * optimistic locking strategy to prevent other processes grabbing the
             * same record.
             * http://en.wikipedia.org/wiki/Optimistic_concurrency_control
             *
             * Process the record ( email in this case)
             *
             * Mark the record as completed.
             */
            MutableDataSource mds = getConnection().getMutableDataSource();
            // find the email to be sent 
    email = (DBEmailSend) mds.findKey(gk); mds.markSavePoint("SEND_START");// Mark the start point. for (int i = 0; true; i++) { try { /* * record the current transaction so that if a concurrent process trys to change the email then a dirty cache will be thrown. */ email.forceLockedTransaction(); /** * check and change the status of the email. Change the status * to a intermediate status and save. * * The save process on this record is atomic and only one * process will be able to successfully change the status to * "PROCESSING". * */ String sendStatus = email.getString(DBEmailSend.DBFIELD_SEND_STATUS); if (sendStatus.equals(DBEmailSendStatus.LIST_ENTERED) == false && sendStatus.equals(DBEmailSendStatus.LIST_QUEUED) == false) { throw new Exception("Send status must be ENTERED or QUEUED"); } email.setValue(DBEmailSend.DBFIELD_SEND_STATUS, DBEmailSendStatus.LIST_PROCESSING); mds.save("Email send started processing:" + email); }
    catch (DirtyCacheException dce) { /** * A concurrent change has been detected. Rollback to the start * point and retry. If the other process has successfully taken * ownership of the record we will skip. */ mds.rollbackTo("SEND_START"); if (i == 9) { String errorMsg = "Cannot send email: " + s + " as repeatedly failed in dirty cache "; LOGGER.error(errorMsg); throw new Exception(errorMsg); } else { long msecs = (long) (1000.0 * Math.random()); Thread.sleep(msecs);
    continue; } }
    /** * The only way that we would successfully get to this step is that * we have taken ownership of the record by setting the status to * "PROCESSING". * * The actual send process will set the status to "OK" or "FAILED". * * If the machine is turned off or crashes at any point the only * email records that we need to check the actual status of is the * ones that are with the status "PROCESSING" */ email.send(mds);
                break;        
         }