Recent Posts

    Authors

    Published

    Tag Cloud

    Multi-Threads: SyncBlock a replacement of the synchronized keyword

    To prevent Java deadlocks a drop in replacement class SyncBlock is intended as a drop in replacement of the keyword synchronized


    When you call the method take() on a SyncBlock object you'll block up until the specified maximum number of seconds and then an error will be thrown if you are unable to obtain the lock on this object. You must ALWAYS call release() on any sync SyncBlock that you have obtained the lock on.

    The SyncBlock differs from the keyword synchronized in that it is interruptible and that it will timeout if it blocks for too long.

    The SyncBlock enhances a normal java.lang.concurrent.Lock in that it will interrupt the blocking thread if it holds the lock for too long and if the thread holding the lock is not alive a new lock object will be created and a fatal email will be generated with the details. If a lock fails to be obtained ( which would have been a deadlock) a fatal email will be generated with the details of the two threads and the blocking thread will be interrupted and the calling thread will have an error thrown.

    Please see below an example of how to use, the associated test cases and the code listing itself.

    com/aspc/DBObj/VirtualDB.java

    6686     /**
    6687      * notify all the class listeners 
    6688      * @param type the type of change MODIFY,DELETE or CREATE
    6689      * @param gk the global key to notify of. 
    6690      */
    6691     private void notifyDBClassListeners( final String type, final GlobalKey gk)
    6692     {
    6693         String key = gk.getClassId().toString();
    6694         
    6695         /**
    6696          * DEADLOCK found when this was a synchronized block. 
    6697          * now we are using a SyncBlock lock object which will timeout after 2 minutes if not successful.
    6698          * 
    6699          * Once you have taken the lock the next statement must be the start of the try block so we never leave this 
    6700          * section without releasing the lock.
    6701          */
    6702         dbClassListenersLock.take();
    6703         try
    6704         {
    6705             ArrayList list = (ArrayList)dbClassListeners.get( key);
    6706 
    6707             if(  list != null)
    6708             {
    6709                 for( int i = 0; i < list.size(); i++)
    6710                 {
    6711                     DBClassListener listener;
    6712 
    6713                     listener = (DBClassListener)list.get( i);
    6714 
    6715                     if( type.equals( DBData.NOTIFY_MODIFIED))
    6716                     {
    6717                         listener.eventObjectModified( gk, this);
    6718                     }
    6719                     else if( type.equals( DBData.NOTIFY_DELETED))
    6720                     {
    6721                         listener.eventObjectDeleted( gk, this);
    6722                     }
    6723                     else if( type.equals( DBData.NOTIFY_CREATED))
    6724                     {
    6725                         listener.eventObjectCreated( gk, this);
    6726                     }
    6727                     else
    6728                     {
    6729                         LOGGER.error( "Wrong type:" + type);
    6730                     }
    6731                 }
    6732             }
    6733         }
    6734         catch( Throwable t)
    6735         {
    6736             LOGGER.warn( "ignored exception in listener", t); // Sr, 12/05/2005 Bug #5224
    6737         }
    6738         finally
    6739         {
    6740             /**
    6741              * Always release the lock if obtained. 
    6742              */
    6743             dbClassListenersLock.release();
    6744         }
    6745     }

    com/aspc/remote/util/misc/SyncBlock.java

    107     /** 
    108      * release the lock
    109      */
    110     public void release()
    111     {
    112         syncLock.unlock();
    113     }
    114     
    115     /**
    116      * take the lock and throw an error if you can't get it. 
    117      */
    118     public void take()
    119     {
    120         try 
    121         {
    122             SyncLock tempLock = syncLock;
    123             if (syncLock.tryLock(blockSeconds, TimeUnit.SECONDS) == false) 
    124             {
    125                 Thread ownerThread = syncLock.getOwner();
    126                 
    127                 if( ownerThread.isAlive() == false)
    128                 {
    129                     synchronized( this)
    130                     {
    131                         if( tempLock == syncLock)
    132                         {
    133                             syncLock = new SyncLock();
    134                         }
    135                     }
    136 
    137                     LOGGER.fatal(this + " never released by " + ownerThread);
    138                     take();
    139                     return;
    140                 }
    141                 
    142                 StringBuilder sb = new StringBuilder( toString());
    143                 sb.append("\n");
    144                 Thread currentThread = Thread.currentThread();
    145                 
    146                 sb.append("Failed to get lock for thread: " +  currentThread + "\n");
    147 
    148                 for (StackTraceElement ste : currentThread.getStackTrace())
    149                 {
    150                     sb.append("\t" + ste + "\n");
    151                 }
    152                 
    153                 if( ownerThread != null)
    154                 {                    
    155                     sb.append("\nLock held by thread: " +  ownerThread + "\n");
    156 
    157                     for (StackTraceElement ste : ownerThread.getStackTrace())
    158                     {
    159                         sb.append("\t" + ste + "\n");
    160                     }
    161                     sb.append("Interrupting holding thread");
    162                     ownerThread.interrupt();
    163                 }
    164                 else
    165                 {
    166                     sb.append("NO OWNER THREAD found");
    167                 }
    168                 
    169                 LOGGER.fatal(sb.toString());
    170 
    171                 throw new DataBaseError("could not get the lock on: " + name);
    172             }
    173         } 
    174         catch (InterruptedException ex) 
    175         {
    176             Thread.interrupted();
    177             LOGGER.warn( "could not take lock on " + name, ex);
    178             Thread.currentThread().interrupt();
    179             throw new DataBaseError("could not get the lock on: " + name, ex);
    180         }
    181     }
    182     
    183     class SyncLock extends ReentrantLock
    184     {
    185         public SyncLock( )
    186         {
    187             super( true);
    188         }
    189         /**
    190          * get the owner thread
    191          * @return the owner thread
    192          */
    193         @Override
    194         public Thread getOwner()//NOPMD
    195         {
    196             return super.getOwner();
    197         }
    198     }
    

    com/aspc/remote/util/misc/selftest/TestSyncBlock.java

     88 
     89     /**
     90      * check we recover from a lock that is never released. 
     91      */
     92     public void testNeverReleased() throws InterruptedException
     93     {
     94         final SyncBlock block = new SyncBlock( "never release", 2);
     95         
     96         Runnable r = new Runnable( )
     97         {
     98             public void run() 
     99             {
    100                 block.take();
    101             }            
    102         };
    103         Thread t = new Thread( r);
    104         t.start();
    105         
    106         t.join( 120000);
    107         
    108         block.take();
    109     }
    110 
    111     /**
    112      * check that we actually do block
    113      */
    114     @SuppressWarnings("empty-statement")
    115     public void testBlock() throws InterruptedException
    116     {
    117         final SyncBlock block = new SyncBlock( "long time", 10);
    118         
    119         Runnable r = new Runnable( )
    120         {
    121             public void run() 
    122             {
    123                 block.take();
    124                 try
    125                 {
    126                     Thread.sleep(120000);
    127                 }
    128                 catch (InterruptedException ex)
    129                 {
    130                     LOGGER.warn("interrupted");
    131                 }
    132                 finally
    133                 {
    134                     block.release();
    135                 }
    136             }            
    137         };
    138         Thread t = new Thread( r);
    139         t.start();
    140         
    141         t.join( 1000);
    142         
    143         try
    144         {
    145             block.take();
    146             fail( "should not succeed");
    147         }
    148         catch( Throwable tw)
    149         {
    150             ;// this is good
    151         }
    152         t.interrupt();
    153         
    154         t.join( 5000);
    155         block.take();
    156     }
    157     
    158     
    159     /**
    160      * check that deadlocks are handled
    161      * @throws Exception a test failure
    162      */
    163     public void testDeadlockHandled() throws Exception
    164     {
    165         a=new A();
    166         b=new B();
    167         Thread at = new Thread( a);
    168 
    169         at.start();
    170         Thread bt = new Thread( b);
    171 
    172         bt.start();
    173 
    174         long start = System.currentTimeMillis();
    175         while( start + 120000 > System.currentTimeMillis())
    176         {
    177             if( a.calling && b.calling ) break;
    178             Thread.sleep(100);
    179         }
    180 
    181         synchronized( marker)
    182         {
    183             marker.notifyAll();
    184         }
    185         LOGGER.info("waiting for detection");
    186         at.join(240000);
    187         bt.join(240000);
    188 
    189         if( a.theException == null && b.theException == null)
    190         {
    191             fail( "The threads were not interrupted");
    192         }
    193 
    194         assertFalse( "should have finished",  at.isAlive());
    195         assertFalse( "should have finished",  bt.isAlive());
    196     }
    197 
    198     class A implements Runnable
    199     {
    200         private final SyncBlock block = new SyncBlock("A block", 2);
    201         boolean calling;
    202         Throwable theException;
    203 
    204         public void run()
    205         {
    206             try
    207             {
    208                 callB();
    209             }
    210             catch( Throwable e)
    211             {
    212                 theException = e;
    213                 LOGGER.warn( "got cancelled", e);
    214             }
    215         }
    216 
    217         public void hello()
    218         {
    219             block.take();
    220             try
    221             {
    222                 LOGGER.info("hello A");
    223             }
    224             finally
    225             {
    226                 block.release();
    227             }
    228         }
    229 
    230         private void callB() throws InterruptedException
    231         {
    232             block.take();
    233             try
    234             {
    235                 calling=true;
    236                 synchronized( marker)
    237                 {
    238                     marker.wait(120000);
    239                 }
    240                 LOGGER.info("call B");
    241                 b.hello();
    242             }
    243             finally
    244             {
    245                 block.release();
    246             }
    247         }
    248     }
    249 
    250     class B implements Runnable
    
    251     {
    252         boolean calling;
    253         Throwable theException;
    254         private final SyncBlock block = new SyncBlock("A block", 2);
    255         
    256         public void run()
    257         {
    258             try
    259             {
    260                 callA();
    261             }
    262             catch( Throwable e)
    263             {
    264                 theException = e;
    265                 LOGGER.warn( "got cancelled", e);
    266             }
    267         }
    268 
    269         public void hello()
    270         {
    271             block.take();
    272             try
    273             {
    274                 LOGGER.info("hello B");
    275             }
    276             finally
    277             {
    278                 block.release();
    279             }
    280         }
    281 
    282         private void callA() throws InterruptedException
    283         {
    284             block.take();
    285             try
    286             {
    287                 calling=true;
    288                 synchronized( marker)
    289                 {
    290                     marker.wait(120000);
    291                 }
    292                 LOGGER.info("call A");
    293                 a.hello();
    294             }
    295             finally
    296             {
    297                 block.release();
    298             }
    299         }
    300     }
    301 
    302     private A a;
    303     private B b;