sthread.h

Go to the documentation of this file.
00001 /* -*- mode:C++; c-basic-offset:4 -*-
00002      Shore-MT -- Multi-threaded port of the SHORE storage manager
00003    
00004                        Copyright (c) 2007-2009
00005       Data Intensive Applications and Systems Labaratory (DIAS)
00006                Ecole Polytechnique Federale de Lausanne
00007    
00008                          All Rights Reserved.
00009    
00010    Permission to use, copy, modify and distribute this software and
00011    its documentation is hereby granted, provided that both the
00012    copyright notice and this permission notice appear in all copies of
00013    the software, derivative works or modified versions, and any
00014    portions thereof, and that both notices appear in supporting
00015    documentation.
00016    
00017    This code is distributed in the hope that it will be useful, but
00018    WITHOUT ANY WARRANTY; without even the implied warranty of
00019    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. THE AUTHORS
00020    DISCLAIM ANY LIABILITY OF ANY KIND FOR ANY DAMAGES WHATSOEVER
00021    RESULTING FROM THE USE OF THIS SOFTWARE.
00022 */
00023 
00024 // -*- mode:c++; c-basic-offset:4 -*-
00025 /*<std-header orig-src='shore' incl-file-exclusion='STHREAD_H'>
00026 
00027  $Id: sthread.h,v 1.206 2010/10/27 17:04:30 nhall Exp $
00028 
00029 SHORE -- Scalable Heterogeneous Object REpository
00030 
00031 Copyright (c) 1994-99 Computer Sciences Department, University of
00032                       Wisconsin -- Madison
00033 All Rights Reserved.
00034 
00035 Permission to use, copy, modify and distribute this software and its
00036 documentation is hereby granted, provided that both the copyright
00037 notice and this permission notice appear in all copies of the
00038 software, derivative works or modified versions, and any portions
00039 thereof, and that both notices appear in supporting documentation.
00040 
00041 THE AUTHORS AND THE COMPUTER SCIENCES DEPARTMENT OF THE UNIVERSITY
00042 OF WISCONSIN - MADISON ALLOW FREE USE OF THIS SOFTWARE IN ITS
00043 "AS IS" CONDITION, AND THEY DISCLAIM ANY LIABILITY OF ANY KIND
00044 FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
00045 
00046 This software was developed with support by the Advanced Research
00047 Project Agency, ARPA order number 018 (formerly 8230), monitored by
00048 the U.S. Army Research Laboratory under contract DAAB07-91-C-Q518.
00049 Further funding for this work was provided by DARPA through
00050 Rome Research Laboratory Contract No. F30602-97-2-0247.
00051 
00052 */
00053 
00054 /*  -- do not edit anything above this line --   </std-header>*/
00055 
00056 /*
00057  * The SHORE threads layer has some historical roots in the
00058  * the NewThreads implementation wrapped up as c++ objects.
00059  *
00060  * With release 6.0 of the SHORE Storage Manager, the NewThreads
00061  * functionality was substantially obviated.  Some bits and pieces
00062  * of the SHORE threads layer remains in the synchronization variables
00063  * in the sthread_t API.
00064  *
00065  * To the extent that any NewThreads code remains here, 
00066  * the following copyright applies: 
00067  *
00068  *   NewThreads is Copyright 1992, 1993, 1994, 1995, 1996, 1997 by:
00069  *
00070  *    Josef Burger    <bolo@cs.wisc.edu>
00071  *    Dylan McNamee   <dylan@cse.ogi.edu>
00072  *    Ed Felten       <felten@cs.princeton.edu>
00073  *
00074  *   All Rights Reserved.
00075  *
00076  *   NewThreads may be freely used as long as credit is given
00077  *   to the above authors and the above copyright is maintained.
00078  */
00079 
00080 /**\file sthread.h
00081  *\ingroup MACROS
00082  *
00083  * This file contains the Shore Threads API.
00084  */
00085 
00086 #ifndef STHREAD_H
00087 #define STHREAD_H
00088 
00089 #include "w_defines.h"
00090 #include "w_rc.h"
00091 #include "atomic_templates.h"
00092 #include "stime.h"
00093 #include "gethrtime.h"
00094 #include <vtable.h>
00095 #include <w_list.h>
00096 
00097 // this #include reflects the fact that sthreads is now just a pthreads wrapper
00098 #include <pthread.h>
00099 #include <sthread_stats.h>
00100 
00101 class sthread_t;
00102 class smthread_t;
00103 
00104 
00105 #ifdef __GNUC__
00106 #pragma interface
00107 #endif
00108 
00109 #ifndef SDISK_H
00110 #include <sdisk.h>
00111 #endif
00112 
00113 class vtable_row_t;
00114 class vtable_t;
00115 
00116 struct sthread_core_t;
00117 
00118 extern "C" void         dumpthreads(); // for calling from debugger
00119 
00120 
00121 /**\brief Base class for sthreads.  See \ref timeout_in_ms, \ref timeout_t
00122  */
00123 class sthread_base_t : public w_base_t {
00124 public:
00125 /**\cond skip */
00126     typedef unsigned int w_thread_id_t; // TODO REMOVE
00127     typedef w_thread_id_t id_t;
00128 /**\endcond skip */
00129 
00130     /* XXX this is really something for the SM, not the threads package;
00131        only WAIT_IMMEDIATE should ever make it to the threads package. */
00132 
00133     /**\enum timeout_t
00134      * \brief Special values for timeout_in_ms.
00135      *
00136      * \details sthreads package recognizes 2 WAIT_* values:
00137      * == WAIT_IMMEDIATE
00138      * and != WAIT_IMMEDIATE.
00139      *
00140      * If it's not WAIT_IMMEDIATE, it's assumed to be
00141      * a positive integer (milliseconds) used for the
00142      * select timeout.
00143      * WAIT_IMMEDIATE: no wait
00144      * WAIT_FOREVER:   may block indefinitely
00145      * The user of the thread (e.g., sm) had better
00146      * convert timeout that are negative values (WAIT_* below)
00147      * to something >= 0 before calling block().
00148      *
00149      * All other WAIT_* values other than WAIT_IMMEDIATE
00150      * are handled by sm layer:
00151      * WAIT_SPECIFIED_BY_THREAD: pick up a timeout_in_ms from the smthread.
00152      * WAIT_SPECIFIED_BY_XCT: pick up a timeout_in_ms from the transaction.
00153      * Anything else: not legitimate.
00154      * 
00155      * \sa timeout_in_ms
00156      */
00157     enum timeout_t {
00158     WAIT_IMMEDIATE     = 0, 
00159     WAIT_FOREVER     = -1,
00160     WAIT_SPECIFIED_BY_THREAD     = -4, // used by lock manager
00161     WAIT_SPECIFIED_BY_XCT = -5, // used by lock manager
00162     WAIT_NOT_USED = -6 // indicates last negative number used by sthreads
00163     };
00164     /* XXX int would also work, sized type not necessary */
00165     /**\typedef int4_t timeout_in_ms;
00166      * \brief Timeout in milliseconds if > 0
00167      * \details
00168      * sthread_t blocking methods take a timeout in milliseconds.
00169      * If the value is < 0, then it's expected to be a member of the
00170      * enumeration type timeout_t.
00171      *
00172      * \sa timeout_t
00173      */
00174     typedef int4_t timeout_in_ms;
00175 
00176 /**\cond skip */
00177     static const w_error_t::info_t     error_info[];
00178     static void  init_errorcodes();
00179 
00180 #include "st_error_enum_gen.h"
00181 
00182     enum {
00183     stOS = fcOS,
00184     stINTERNAL = fcINTERNAL,
00185     stNOTIMPLEMENTED = fcNOTIMPLEMENTED 
00186     };
00187 
00188     /* import sdisk base */
00189     typedef sdisk_base_t::fileoff_t    fileoff_t;
00190     typedef sdisk_base_t::filestat_t   filestat_t;
00191     typedef sdisk_base_t::iovec_t      iovec_t;
00192 
00193 
00194     /* XXX magic number */
00195     enum { iovec_max = 8 };
00196 
00197     enum {
00198     OPEN_RDWR = sdisk_base_t::OPEN_RDWR,
00199     OPEN_RDONLY = sdisk_base_t::OPEN_RDONLY,
00200     OPEN_WRONLY = sdisk_base_t::OPEN_WRONLY,
00201 
00202     OPEN_SYNC = sdisk_base_t::OPEN_SYNC,
00203     OPEN_TRUNC = sdisk_base_t::OPEN_TRUNC,
00204     OPEN_CREATE = sdisk_base_t::OPEN_CREATE,
00205     OPEN_EXCL = sdisk_base_t::OPEN_EXCL,
00206     OPEN_APPEND = sdisk_base_t::OPEN_APPEND,
00207     OPEN_RAW = sdisk_base_t::OPEN_RAW
00208     };
00209     enum {
00210     SEEK_AT_SET = sdisk_base_t::SEEK_AT_SET,
00211     SEEK_AT_CUR = sdisk_base_t::SEEK_AT_CUR,
00212     SEEK_AT_END = sdisk_base_t::SEEK_AT_END
00213     };
00214 /**\endcond skip */
00215 };
00216 
00217 /**\cond skip */
00218 class sthread_name_t {
00219 public:
00220     enum { NAME_ARRAY = 64 };
00221 
00222     char        _name[NAME_ARRAY];
00223 
00224     sthread_name_t();
00225     ~sthread_name_t();
00226 
00227     void rename(const char *n1, const char *n2=0, const char *n3=0);
00228 };
00229 
00230 class sthread_named_base_t: public sthread_base_t
00231 {
00232 public:
00233     NORET            sthread_named_base_t(
00234     const char*            n1 = 0,
00235     const char*            n2 = 0,
00236     const char*            n3 = 0);
00237     NORET            ~sthread_named_base_t();
00238     
00239     void            rename(
00240     const char*            n1,
00241     const char*            n2 = 0,
00242     const char*            n3 = 0);
00243 
00244     const char*            name() const;
00245     void                   unname();
00246 
00247 private:
00248     sthread_name_t        _name;
00249 };
00250 
00251 inline NORET
00252 sthread_named_base_t::sthread_named_base_t(
00253     const char*        n1,
00254     const char*        n2,
00255     const char*        n3)
00256 {
00257     rename(n1, n2, n3);
00258 
00259 }
00260 
00261 inline const char*
00262 sthread_named_base_t::name() const
00263 {
00264     return _name._name;
00265 }
00266 
00267 class sthread_main_t;
00268 
00269 /**\endcond skip */
00270 
00271 /**\brief A callback class for traversing the list of all sthreads.
00272  * \details
00273  * Use with for_each_thread. Somewhat costly because it's thread-safe.
00274  */
00275 class ThreadFunc
00276 {
00277     public:
00278     virtual void operator()(const sthread_t& thread) = 0;
00279     virtual NORET ~ThreadFunc() {}
00280 };
00281 
00282 
00283 class sthread_init_t;
00284 class sthread_main_t;
00285 
00286 // these macros allow us to notify the SunStudio race detector about lock acquires/releases
00287 
00288 #include "os_interface.h"
00289 
00290 /**\brief A test-and-test-and-set spinlock. 
00291  *
00292  * This lock is good for short, uncontended critical sections. 
00293  * If contention is high, use an mcs_lock. 
00294  * Long critical sections should use pthread_mutex_t.
00295  *
00296  * Tradeoffs are:
00297  *  - test-and-test-and-set locks: low-overhead but not scalable
00298  *  - queue-based locks: higher overhead but scalable
00299  *  - pthread mutexes : very high overhead and blocks, but frees up 
00300  *  cpu for other threads when number of cpus is fewer than number of threads
00301  *
00302  *  \sa REFSYNC
00303  */
00304 struct tatas_lock {
00305     /**\cond skip */
00306     enum { NOBODY=0 };
00307     typedef union  {
00308         pthread_t         handle;
00309 #undef CASFUNC 
00310 #if SIZEOF_PTHREAD_T==4
00311 #define CASFUNC atomic_cas_32
00312         unsigned int       bits;
00313 #elif SIZEOF_PTHREAD_T==8
00314 # define CASFUNC atomic_cas_64
00315         uint64_t           bits;
00316 #elif SIZEOF_PTHREAD_T==0
00317 #error  Configuration could not determine size of pthread_t. Fix configure.ac.
00318 #else 
00319 #error  Configuration determined size of pthread_t is unexpected. Fix sthread.h.
00320 #endif
00321     } holder_type_t;
00322     volatile holder_type_t _holder;
00323     /**\endcond skip */
00324 
00325     tatas_lock() { _holder.bits=NOBODY; }
00326 
00327 private:
00328     // CC mangles this as __1cKtatas_lockEspin6M_v_
00329     /// spin until lock is free
00330     void spin() { while(*&(_holder.handle)) ; }
00331 
00332 public:
00333     /// Try to acquire the lock immediately.
00334     bool try_lock() 
00335     {
00336         holder_type_t tid = { pthread_self() };
00337         bool success = false;
00338         unsigned int old_holder = 
00339                         CASFUNC(&_holder.bits, NOBODY, tid.bits);
00340         if(old_holder == NOBODY) {
00341             membar_enter();
00342             success = true;
00343         }
00344         
00345         return success;
00346     }
00347 
00348     /// Acquire the lock, spinning as long as necessary. 
00349     void acquire() {
00350         w_assert1(!is_mine());
00351         holder_type_t tid = { pthread_self() };
00352         do {
00353             spin();
00354         }
00355         while(CASFUNC(&_holder.bits, NOBODY, tid.bits));
00356         membar_enter();
00357         w_assert1(is_mine());
00358     }
00359 
00360     /// Release the lock
00361     void release() {
00362         membar_exit();
00363         w_assert1(is_mine()); // moved after the membar
00364         _holder.bits= NOBODY;
00365 #if W_DEBUG_LEVEL > 0
00366         {
00367             membar_enter(); // needed for the assert?
00368             w_assert1(!is_mine());
00369         }
00370 #endif
00371     }
00372 
00373     /// True if this thread is the lock holder
00374     bool is_mine() const { return 
00375         pthread_equal(_holder.handle, pthread_self()) ? true : false; }
00376 #undef CASFUNC 
00377 };
00378 
00379 /**\brief Wrapper for pthread mutexes, with a queue-based lock API.
00380  *
00381  * When the storage manager is configured with the default,
00382  * --enable-pthread-mutex, this lock uses a Pthreads mutex for the lock.
00383  * In this case, it is not a true queue-based lock, since
00384  * release doesn't inform the next node in the queue, and in fact the
00385  * nodes aren't kept in a queue.
00386  * It just gives pthread mutexes the same API as the other
00387  * queue-based locks so that we use the same idioms for
00388  * critical sections based on different kinds of locks.
00389  * By configuring with pthreads mutexes implementing this class, the
00390  * server can spawn any number of threads, regardless of the number
00391  * of hardware contexts available; threads will block as necessary.
00392  *
00393  * When the storage manager is configured with 
00394  * --disable-pthread-mutex, this lock uses an MCS (\ref MCS1) queue-based
00395  * lock for the lock.
00396  * In this case, it is a true queue-based lock.
00397  * By configuring with MCS locks implementing this class, if the
00398  * server spawn many more threads than hardware contexts, time can be wasted
00399  * spinning; threads will not block until the operating system (or underlying 
00400  * thread scheduler) determines to block the thread.
00401  *
00402  * The idiom for using these locks is
00403  * that the qnode is on a threads's stack, so the qnode
00404  * implicitly identifies the owning thread.
00405  *
00406  * This allows us to add an is_mine() capability that otherwise
00407  * the pthread mutexen don't have.
00408  *
00409  * Finally, using this class ensures that the pthread_mutex_init/destroy
00410  * is done (in the --enable-pthread-mutex case).
00411  *
00412  *  See also: \ref REFSYNC
00413  *
00414  */
00415 struct w_pthread_lock_t 
00416 {
00417     /**\cond skip */
00418     struct ext_qnode {
00419         w_pthread_lock_t* _held;
00420     };
00421 #define PTHREAD_EXT_QNODE_INITIALIZER { NULL }
00422 #define PTHREAD_EXT_QNODE_INITIALIZE(x) (x)._held =  NULL
00423 
00424     typedef ext_qnode volatile* ext_qnode_ptr;
00425     /**\endcond skip */
00426 
00427 private:
00428     pthread_mutex_t     _mutex; // w_pthread_lock_t blocks on this
00429     /// Holder is this struct if acquire is successful.
00430     w_pthread_lock_t *  _holder;
00431 
00432 public:
00433     w_pthread_lock_t() :_holder(0) { pthread_mutex_init(&_mutex, 0); }
00434 
00435     ~w_pthread_lock_t() { w_assert1(!_holder); pthread_mutex_destroy(&_mutex);}
00436     
00437     /// Returns true if success.
00438     bool attempt(ext_qnode* me) {
00439         if(attempt( *me)) {
00440             me->_held = this;
00441             _holder = this;
00442             return true;
00443         }
00444         return false;
00445     }
00446 
00447 private:
00448     /// Returns true if success. Helper for attempt(ext_qnode *).
00449     bool attempt(ext_qnode & me) {
00450         w_assert1(!is_mine(&me));
00451         w_assert0( me._held == 0 );  // had better not 
00452         // be using this qnode for another lock!
00453         return pthread_mutex_trylock(&_mutex) == 0;
00454     }
00455 
00456 public:
00457     /// Acquire the lock and set the qnode to refer to this lock.
00458     void* acquire(ext_qnode* me) {
00459         w_assert1(!is_mine(me));
00460         w_assert1( me->_held == 0 );  // had better not 
00461         // be using this qnode for another lock!
00462         pthread_mutex_lock(&_mutex);
00463         me->_held = this;
00464         _holder = this;
00465 #if W_DEBUG_LEVEL > 0
00466         {
00467             membar_enter(); // needed for the assert
00468             w_assert1(is_mine(me)); // TODO: change to assert2
00469         }
00470 #endif
00471         return 0;
00472     }
00473 
00474     /// Release the lock and clear the qnode.
00475     void release(ext_qnode &me) { release(&me); }
00476 
00477     /// Release the lock and clear the qnode.
00478     void release(ext_qnode_ptr me) { 
00479         // assert is_mine:
00480         w_assert1( _holder == me->_held ); 
00481         w_assert1(me->_held == this); 
00482          me->_held = 0; 
00483         _holder = 0;
00484         pthread_mutex_unlock(&_mutex); 
00485 #if W_DEBUG_LEVEL > 10
00486   // This is racy since the containing structure could
00487   // have been freed by the time we do this check.  Thus,
00488   // we'll remove it.
00489         {
00490             membar_enter(); // needed for the assertions?
00491             w_pthread_lock_t *h =  _holder;
00492             w_pthread_lock_t *m =  me->_held;
00493             w_assert1( (h==NULL && m==NULL)
00494                 || (h  != m) );
00495         }
00496 #endif
00497     }
00498 
00499     /**\brief Return true if this thread holds the lock.
00500      *
00501      * This method doesn't actually check for this pthread
00502      * holding the lock, but it checks that the qnode reference
00503      * is to this lock.  
00504      * The idiom for using these locks is
00505      * that the qnode is on a threads's stack, so the qnode
00506      * implicitly identifies the owning thread.
00507      */
00508     
00509     bool is_mine(ext_qnode* me) const { 
00510        if( me->_held == this ) {
00511            // only valid if is_mine 
00512           w_assert1( _holder == me->_held ); 
00513           return true;
00514        }
00515        return false;
00516     }
00517 };
00518 
00519 /**\def USE_PTHREAD_MUTEX
00520  * \brief If defined and value is 1, use pthread-based mutex for queue_based_lock_t
00521  *
00522  * \details
00523  * The Shore-MT release contained alternatives for scalable locks in
00524  * certain places in the storage manager; it was released with
00525  * these locks replaced by pthreads-based mutexes.
00526  *
00527  * You can disable the use of pthreads-based mutexes and use the
00528  * mcs-based locks by configuring with --disable-pthread-mutex.
00529  */
00530 
00531 /**\defgroup SYNCPRIM Synchronization Primitives
00532  *\ingroup UNUSED 
00533  *
00534  * sthread/sthread.h: As distributed, a queue-based lock 
00535  * is a w_pthread_lock_t,
00536  * which is a wrapper around a pthread lock to give it a queue-based-lock API.
00537  * True queue-based locks are not used, nor are time-published
00538  * locks.
00539  * Code for these implementations is included for future 
00540  * experimentation, along with typedefs that should allow
00541  * easy substitution, as they all should have the same API.
00542  *
00543  * We don't offer the spin implementations at the moment.
00544  */
00545 /*
00546  * These typedefs are included to allow substitution at some  point.
00547  * Where there is a preference, the code should use the appropriate typedef.
00548  */
00549 
00550 typedef w_pthread_lock_t queue_based_block_lock_t; // blocking impl always ok
00551 #define QUEUE_BLOCK_EXT_QNODE_INITIALIZER PTHREAD_EXT_QNODE_INITIALIZER
00552 // non-static initialize:
00553 #define QUEUE_BLOCK_EXT_QNODE_INITIALIZE(x) x._held = NULL
00554 
00555 #if defined(USE_PTHREAD_MUTEX) && USE_PTHREAD_MUTEX==1
00556 typedef w_pthread_lock_t queue_based_spin_lock_t; // spin impl preferred
00557 typedef w_pthread_lock_t queue_based_lock_t; // might want to use spin impl
00558 #define QUEUE_SPIN_EXT_QNODE_INITIALIZER PTHREAD_EXT_QNODE_INITIALIZER
00559 #define QUEUE_EXT_QNODE_INITIALIZER      PTHREAD_EXT_QNODE_INITIALIZER
00560 // non-static initialize:
00561 #define QUEUE_EXT_QNODE_INITIALIZE(x) x._held = NULL;
00562 #else
00563 #include <mcs_lock.h>
00564 typedef mcs_lock queue_based_spin_lock_t; // spin preferred
00565 typedef mcs_lock queue_based_lock_t;
00566 #define QUEUE_SPIN_EXT_QNODE_INITIALIZER MCS_EXT_QNODE_INITIALIZER
00567 #define QUEUE_EXT_QNODE_INITIALIZER      MCS_EXT_QNODE_INITIALIZER
00568 // non-static initialize:
00569 #define QUEUE_EXT_QNODE_INITIALIZE(x) MCS_EXT_QNODE_INITIALIZE(x)
00570 #endif
00571 
00572 #ifndef SRWLOCK_H
00573 #include <srwlock.h>
00574 #endif
00575 
00576 /**\brief A multiple-reader/single-writer lock based on pthreads (blocking)
00577  *
00578  * Use this to protect data structures that get hammered by
00579  *  reads and where updates are very rare.
00580  * It is used in the storage manager by the histograms (histo.cpp), 
00581  * and in place of some mutexen, where strict exclusion isn't required.
00582  *
00583  * This lock is used in the storage manager by the checkpoint thread
00584  * (the only acquire-writer) and other threads to be sure they don't
00585  * do certain nasty things when a checkpoint is going on.
00586  *
00587  * The idiom for using these locks is
00588  * that the qnode is on a threads's stack, so the qnode
00589  * implicitly identifies the owning thread.
00590  *
00591  *  See also: \ref REFSYNC
00592  *
00593  */
00594 struct occ_rwlock {
00595     occ_rwlock();
00596     ~occ_rwlock();
00597     /// The normal way to acquire a read lock.
00598     void acquire_read();
00599     /// The normal way to release a read lock.
00600     void release_read();
00601     /// The normal way to acquire a write lock.
00602     void acquire_write();
00603     /// The normal way to release a write lock.
00604     void release_write();
00605 
00606     /**\cond skip */
00607     /// Exposed for critical_section<>. Do not use directly.
00608     struct occ_rlock {
00609         occ_rwlock* _lock;
00610         void acquire() { _lock->acquire_read(); }
00611         void release() { _lock->release_read(); }
00612     };
00613     /// Exposed for critical_section<>. Do not use directly.
00614     struct occ_wlock {
00615         occ_rwlock* _lock;
00616         void acquire() { _lock->acquire_write(); }
00617         void release() { _lock->release_write(); }
00618     };
00619 
00620     /// Exposed for the latch manager.. Do not use directly.
00621     occ_rlock *read_lock() { return &_read_lock; }
00622     /// Exposed for the latch manager.. Do not use directly.
00623     occ_wlock *write_lock() { return &_write_lock; }
00624     /**\endcond skip */
00625 private:
00626     enum { WRITER=1, READER=2 };
00627     unsigned int volatile _active_count;
00628     occ_rlock _read_lock;
00629     occ_wlock _write_lock;
00630 
00631     pthread_mutex_t _read_write_mutex; // paired w/ _read_cond, _write_cond
00632     pthread_cond_t _read_cond; // paired w/ _read_write_mutex
00633     pthread_cond_t _write_cond; // paired w/ _read_write_mutex
00634 };
00635 
00636 typedef w_list_t<sthread_t, queue_based_lock_t>        sthread_list_t;
00637 
00638 
00639 /**\brief Thread class for all threads that use the Shore Storage Manager.
00640  *  
00641  *  All threads that perform \b any work on behalf of the storage
00642  *  manager or call any storage manager API \b must be an sthread_t or
00643  *  a class derived from sthread_t.
00644  *
00645  *  Storage manager threads use block/unblock methods provided by
00646  *  sthread, and use thread-local storage (data attributes of
00647  *  sthread_t).
00648  *
00649  *  This class also provides an os-independent API for file-system
00650  *  calls (open, read, write, close, etc.) used by the storage manager.
00651  *
00652  *  This class is a fairly thin layer over pthreads.  Client threads
00653  *  may use pthread synchronization primitives. 
00654  */
00655 class sthread_t : public sthread_named_base_t  
00656 {
00657     friend class sthread_init_t;
00658     friend class sthread_main_t;
00659     /* For access to block() and unblock() */
00660     friend class latch_t;
00661     /* For access to I/O stats */
00662 
00663 
00664 public:
00665     static void  initialize_sthreads_package();
00666 
00667     enum status_t {
00668         t_defunct,    // thread has terminated
00669         t_virgin,    // thread hasn't started yet    
00670         t_ready,    // thread is ready to run
00671         t_running,    // when me() is this thread 
00672         t_blocked,      // thread is blocked on something
00673         t_boot        // system boot
00674     };
00675     static const char *status_strings[];
00676 
00677     enum priority_t {
00678         t_time_critical = 1,
00679         t_regular    = 0,
00680         max_priority    = t_time_critical,
00681         min_priority    = t_regular
00682     };
00683     static const char *priority_strings[];
00684 
00685     /* Default stack size for a thread */
00686     enum { default_stack = 64*1024 };
00687 
00688     /*
00689      *  Class member variables
00690      */
00691     void*             user;    // user can use this 
00692     const id_t        id;
00693 
00694     // max_os_file_size is used by the sm and set in
00695     // static initialization of sthreads (sthread_init_t in sthread.cpp)
00696     static w_base_t::int8_t     max_os_file_size;
00697 
00698 private:
00699 
00700     // ASSUMES WE ALREADY LOCKED self->_wait_lock
00701     static w_rc_t::errcode_t        _block(
00702                             timeout_in_ms          timeout = WAIT_FOREVER,
00703                             const char* const      caller = 0,
00704                             const void *           id = 0);
00705 
00706     static w_rc_t::errcode_t        _block(
00707                             pthread_mutex_t        *lock, 
00708                             timeout_in_ms          timeout = WAIT_FOREVER,
00709                             sthread_list_t*        list = 0,
00710                             const char* const      caller = 0,
00711                             const void *           id = 0);
00712 
00713     w_rc_t               _unblock(w_rc_t::errcode_t e);
00714 
00715 public:
00716     static void          timeout_to_timespec(timeout_in_ms timeout, 
00717                                              struct timespec &when);
00718     w_rc_t               unblock(w_rc_t::errcode_t e);
00719     static w_rc_t        block(
00720                             pthread_mutex_t        &lock,
00721                             timeout_in_ms          timeout = WAIT_FOREVER,
00722                             sthread_list_t*        list = 0,
00723                             const char* const      caller = 0,
00724                             const void *           id = 0);
00725     static w_rc_t::errcode_t       block(int4_t  timeout = WAIT_FOREVER);
00726 
00727     virtual void        _dump(ostream &) const; // to be over-ridden
00728 
00729     // these traverse all threads
00730     static void       dumpall(const char *, ostream &);
00731     static void       dumpall(ostream &);
00732 
00733     static void       dump_io(ostream &);
00734     static void       dump_event(ostream &);
00735 
00736     static void       dump_stats(ostream &);
00737     static void       reset_stats();
00738 
00739     /// Collect a row of a virtual table. One row per thread.
00740     /// Subclasses override this.
00741     virtual void      vtable_collect(vtable_row_t &); // to be over-ridden
00742     /// Stuff the attribute names in this row.
00743     static  void      vtable_collect_names(vtable_row_t &); // to be over-ridden
00744 
00745     /// Collect an entire table, one row per thread that the sthreads package
00746     /// knows about. If attr_names_too is true, the first row will be
00747     /// attribute names.
00748     static int        collect(vtable_t&v, bool attr_names_too=true); 
00749                         // in vtable_sthread.cpp
00750 
00751     static void      find_stack(void *address);
00752     static void      for_each_thread(ThreadFunc& f);
00753 
00754     /* request stack overflow check, die on error. */
00755     static void      check_all_stacks(const char *file = "",
00756                              int line = 0);
00757     bool             isStackOK(const char *file = "", int line = 0) const;
00758 
00759     /* Recursion, etc stack depth estimator */
00760     bool             isStackFrameOK(size_t size = 0);
00761 
00762     w_rc_t           set_priority(priority_t priority);
00763     priority_t       priority() const;
00764     status_t         status() const;
00765 
00766 private:
00767 
00768 #ifdef WITHOUT_MMAP
00769     static w_rc_t     set_bufsize_memalign(size_t size, 
00770                         char *&buf_start /* in/out*/, long system_page_size);
00771 #endif
00772 #ifdef HAVE_HUGETLBFS
00773 public:
00774     // Must be called if we are configured with  hugetlbfs
00775     static w_rc_t     set_hugetlbfs_path(const char *path);
00776 private:
00777     static w_rc_t     set_bufsize_huge(size_t size, 
00778                         char *&buf_start /* in/out*/, long system_page_size);
00779 #endif
00780     static w_rc_t     set_bufsize_normal(size_t size, 
00781                         char *&buf_start /* in/out*/, long system_page_size);
00782     static void       align_bufsize(size_t size, long system_page_size,
00783                                                 long max_page_size);
00784     static long       get_max_page_size(long system_page_size);
00785     static void       align_for_sm(size_t requested_size);
00786 
00787 public:
00788     static int          do_unmap(); 
00789     /*
00790      *  Concurrent I/O ops
00791      */
00792     static char*        set_bufsize(size_t size);
00793     static w_rc_t       set_bufsize(size_t size, char *&buf_start /* in/out*/,
00794                                     bool use_normal_if_huge_fails=false);
00795 
00796     static w_rc_t        open(
00797                             const char*            path,
00798                             int                flags,
00799                             int                mode,
00800                             int&                fd);
00801     static w_rc_t        close(int fd);
00802     static w_rc_t        read(
00803                             int                 fd,
00804                             void*                 buf,
00805                             int                 n);
00806     static w_rc_t        write(
00807                             int                 fd, 
00808                             const void*             buf, 
00809                             int                 n);
00810     static w_rc_t        readv(
00811                             int                 fd, 
00812                             const iovec_t*             iov,
00813                             size_t                iovcnt);
00814     static w_rc_t        writev(
00815                             int                 fd,
00816                             const iovec_t*                iov,
00817                             size_t                 iovcnt);
00818 
00819     static w_rc_t        pread(int fd, void *buf, int n, fileoff_t pos);
00820     static w_rc_t        pwrite(int fd, const void *buf, int n,
00821                            fileoff_t pos);
00822     static w_rc_t        lseek(
00823                             int                fd,
00824                             fileoff_t            offset,
00825                             int                whence,
00826                             fileoff_t&            ret);
00827     /* returns an error if the seek doesn't match its destination */
00828     static w_rc_t        lseek(
00829                             int                fd,
00830                             fileoff_t                offset,
00831                             int                whence);
00832     static w_rc_t        fsync(int fd);
00833     static w_rc_t        ftruncate(int fd, fileoff_t sz);
00834     static w_rc_t        fstat(int fd, filestat_t &sb);
00835     static w_rc_t        fisraw(int fd, bool &raw);
00836 
00837 
00838     /*
00839      *  Misc
00840      */
00841 private:
00842  // NOTE: this returns a REFERENCE to a pointer
00843  /* #\fn static sthread_t*& sthread_t::me_lval()
00844   ** \brief Returns a (writable) reference to the a 
00845   * pointer to the running sthread_t.
00846   * \ingroup TLS
00847   */
00848  inline static sthread_t*& me_lval() {
00849   /**\var sthread_t* _me;
00850    * \brief A pointer to the running sthread_t.
00851    * \ingroup TLS
00852    */
00853   static __thread sthread_t* _TLSme(NULL);
00854   return _TLSme;
00855  }
00856 public:
00857     // NOTE: this returns a POINTER
00858     static sthread_t*    me() { return me_lval(); }
00859                          // for debugging:
00860     pthread_t            myself(); // pthread_t associated with this 
00861     static int           rand(); // returns an int in [0, 2**31)
00862     static double        drand(); // returns a double in [0.0, 1)
00863     static int           randn(int max); // returns an int in [0, max)
00864 
00865     /* XXX  sleep, fork, and wait exit overlap the unix version. */
00866 
00867     // sleep for timeout milliseconds
00868     void                 sleep(timeout_in_ms timeout = WAIT_IMMEDIATE,
00869                          const char *reason = 0);
00870     void                 wakeup();
00871 
00872     // wait for a thread to finish running
00873     w_rc_t            join(timeout_in_ms timeout = WAIT_FOREVER);
00874 
00875     // start a thread
00876     w_rc_t            fork();
00877 
00878     // give up the processor
00879     static void        yield();
00880     ostream            &print(ostream &) const;
00881 
00882     // anyone can wait and delete a thread
00883     virtual            ~sthread_t();
00884 
00885     // function to do runtime up-cast to smthread_t
00886     // return 0 if the sthread is not derrived from sm_thread_t.
00887     // should be removed when RTTI is supported
00888     virtual smthread_t*        dynamic_cast_to_smthread();
00889     virtual const smthread_t*  dynamic_cast_to_const_smthread() const;
00890 
00891 protected:
00892     sthread_t(
00893           priority_t    priority = t_regular,
00894           const char    *name = 0,
00895           unsigned        stack_size = default_stack);
00896 
00897     virtual void        before_run() { }
00898     virtual void        run() = 0;
00899     virtual void        after_run() { }
00900 
00901 private:
00902 
00903     /* start offset of sthread FDs, to differentiate from system FDs */
00904     enum { fd_base = 4000 };
00905     void *                      _start_frame;
00906     void *                      _danger;
00907     size_t                      _stack_size;
00908 
00909     pthread_mutex_t             _wait_lock; // paired with _wait_cond, also
00910                                 // protects _link
00911     pthread_cond_t              _wait_cond; // posted when thread should unblock
00912 
00913     pthread_mutex_t*            _start_terminate_lock; // _start_cond, _terminate_cond, _forked
00914     pthread_cond_t *            _start_cond; // paired w/ _start_terminate_lock
00915 
00916     volatile bool               _sleeping;
00917     volatile bool               _forked;
00918     bool                        _terminated; // protects against double calls
00919                                 // to sthread_core_exit
00920     volatile bool               _unblock_flag; // used internally by _block()
00921 
00922     fill4                       _dummy4valgrind;
00923     
00924     sthread_core_t *            _core;        // registers, stack, etc
00925     volatile status_t           _status;    // thread status
00926     priority_t                  _priority;     // thread priority
00927     w_rc_t::errcode_t           _rce;        // used in block/unblock
00928 
00929     w_link_t                    _link;        // protected by _wait_lock
00930 
00931     w_link_t                    _class_link;    // used in _class_list,
00932                                  // protected by _class_list_lock
00933     static sthread_list_t*      _class_list;
00934     static queue_based_lock_t   _class_list_lock; // for protecting _class_list
00935 
00936 
00937     /* XXX alignment probs in derived thread classes.  Sigh */
00938     // fill4                       _ex_fill;
00939 
00940     /* I/O subsystem */
00941     static    sdisk_t        **_disks;
00942     static    unsigned       open_max;
00943     static    unsigned       open_count;
00944 
00945     /* in-thread startup and shutdown */ 
00946     static void            __start(void *arg_thread);
00947     void                   _start();
00948 
00949 
00950     /* system initialization and shutdown */
00951     static w_rc_t        cold_startup();
00952     static w_rc_t        shutdown();
00953     static stime_t        boot_time;
00954     static sthread_t*    _main_thread; 
00955     static uint4_t        _next_id;    // unique id generator
00956 
00957 private:
00958     static int           _disk_buffer_disalignment;
00959     static size_t        _disk_buffer_size;
00960     static char *        _disk_buffer;
00961 public:
00962  // export so smthread can read it and so latch/srwlock can write it:
00963  sthread_stats        SthreadStats;
00964 };
00965 
00966 extern ostream &operator<<(ostream &o, const sthread_t &t);
00967 
00968 void print_timeout(ostream& o, const sthread_base_t::timeout_in_ms timeout);
00969 
00970 
00971 /**\cond skip */
00972 /**\brief The main thread. 
00973 *
00974 * Called from sthread_t::cold_startup(), which is
00975 * called from sthread_init_t::do_init(), which is 
00976 * called from sthread_t::initialize_sthreads_package(), which is called 
00977 * when the storage manager sets up options, among other places.
00978 */
00979 class sthread_main_t : public sthread_t  {
00980     friend class sthread_t;
00981     
00982 protected:
00983     NORET            sthread_main_t();
00984     virtual void        run();
00985 };
00986 /**\endcond skip */
00987 
00988 
00989 /**\cond skip */
00990 
00991 #define MUTEX_ACQUIRE(mutex)    W_COERCE((mutex).acquire());
00992 #define MUTEX_RELEASE(mutex)    (mutex).release();
00993 #define MUTEX_IS_MINE(mutex)    (mutex).is_mine()
00994 
00995 
00996 /**\def CRITICAL_SECTION(name, lock)
00997  *
00998  * This macro starts a critical section protected by the given lock
00999  * (2nd argument).  The critical_section structure it creates is
01000  * named by the 1st argument.
01001  * The rest of the scope (in which this macro is used) becomes the
01002  * scope of the critical section, since it is the destruction of this
01003  * critical_section structure that releases the lock.
01004  *
01005  * The programmer can release the lock early by calling <name>.pause()
01006  * or <name>.exit().
01007  * The programmer can reacquire the lock by calling <name>.resume() if
01008  * <name>.pause() was called, but not after <name>.exit().
01009  *
01010  * \sa critical_section
01011  */
01012 #define CRITICAL_SECTION(name, lock) critical_section<__typeof__(lock)&> name(lock)
01013 
01014 template<class Lock>
01015 struct critical_section;
01016 
01017 /**\brief Helper class for CRITICAL_SECTION idiom (macro).
01018  *
01019  * This templated class does nothing; its various specializations 
01020  * do the work of acquiring the given lock upon construction and
01021  * releasing it upon destruction. 
01022  * See the macros:
01023  * - SPECIALIZE_CS(Lock, Extra, ExtraInit, Acquire, Release)  
01024  * - CRITICAL_SECTION(name, lock) 
01025  */
01026 template<class Lock>
01027 struct critical_section<Lock*&> : public critical_section<Lock&> {
01028     critical_section<Lock*&>(Lock* mutex) : critical_section<Lock&>(*mutex) { }
01029 };
01030 
01031 /*
01032  * NOTE: I added ExtraInit to make the initialization happen so that
01033  * assertions about holding the mutex don't fail.
01034  * At the same time, I added a holder to the w_pthread_lock_t
01035  * implementation so I could make assertions about the holder outside
01036  * the lock implementation itself.  This might seem like doubly
01037  * asserting, but in the cases where the critical section isn't
01038  * based on a pthread mutex, we really should have this clean
01039  * initialization and the check the assertions.
01040  */
01041 
01042 /**\def SPECIALIZE_CS(Lock, Extra, ExtraInit, Acquire, Release) 
01043  * \brief Macro that enables use of CRITICAL_SECTION(name,lock)
01044  *\addindex SPECIALIZE_CS
01045  * 
01046  * \details
01047  * Create a templated class that holds 
01048  *   - a reference to the given lock and
01049  *   - the Extra (2nd macro argument)
01050  *
01051  *  and it
01052  *   - applies the ExtraInit and Acquire commands upon construction,
01053  *   - applies the Release command upon destruction.
01054  *
01055  */
01056 #define SPECIALIZE_CS(Lock,Extra,ExtraInit,Acquire,Release) \
01057 template<>  struct critical_section<Lock&> { \
01058 critical_section(Lock &mutex) \
01059     : _mutex(&mutex)          \
01060     {   ExtraInit; Acquire; } \
01061     ~critical_section() {     \
01062         if(_mutex)            \
01063             Release;          \
01064             _mutex = NULL;    \
01065         }                     \
01066     void pause() { Release; } \
01067     void resume() { Acquire; }\
01068     void exit() { Release; _mutex = NULL; } \
01069     Lock &hand_off() {        \
01070         Lock* rval = _mutex;  \
01071         _mutex = NULL;        \
01072         return *rval;         \
01073     }                         \
01074 private:                      \
01075     Lock* _mutex;             \
01076     Extra;                    \
01077     void operator=(critical_section const &);   \
01078     critical_section(critical_section const &); \
01079 }
01080 
01081 
01082 // I undef-ed this and found all occurrances of CRITICAL_SECTION with this.
01083 // and hand-checked them.
01084 SPECIALIZE_CS(pthread_mutex_t, int _dummy,  (_dummy=0), 
01085     pthread_mutex_lock(_mutex), pthread_mutex_unlock(_mutex));
01086 
01087 // tatas_lock doesn't have is_mine, but I changed its release()
01088 // to Release and through compiling saw everywhere that uses release,
01089 // and fixed those places
01090 SPECIALIZE_CS(tatas_lock, int _dummy, (_dummy=0), 
01091     _mutex->acquire(), _mutex->release());
01092 
01093 // queue_based_lock_t asserts is_mine() in release()
01094 SPECIALIZE_CS(w_pthread_lock_t, w_pthread_lock_t::ext_qnode _me, (_me._held=0), 
01095     _mutex->acquire(&_me), _mutex->release(&_me));
01096 #if !defined(USE_PTHREAD_MUTEX) || USE_PTHREAD_MUTEX==0
01097 SPECIALIZE_CS(mcs_lock, mcs_lock::ext_qnode _me, (_me._held=0), 
01098     _mutex->acquire(&_me), _mutex->release(&_me));
01099 #endif
01100 
01101 SPECIALIZE_CS(occ_rwlock::occ_rlock, int _dummy, (_dummy=0), 
01102     _mutex->acquire(), _mutex->release());
01103 
01104 SPECIALIZE_CS(occ_rwlock::occ_wlock, int _dummy, (_dummy=0), 
01105     _mutex->acquire(), _mutex->release());
01106 
01107 
01108 inline sthread_t::priority_t
01109 sthread_t::priority() const
01110 {
01111     return _priority;
01112 }
01113 
01114 inline sthread_t::status_t
01115 sthread_t::status() const
01116 {
01117     return _status;
01118 }
01119 
01120 #include <w_strstream.h>
01121 // Need string.h to get strerror_r 
01122 #include <string.h>
01123 
01124 #define DO_PTHREAD_BARRIER(x) \
01125 {   int res = x; \
01126     if(res && res != PTHREAD_BARRIER_SERIAL_THREAD) { \
01127        w_ostrstream S; \
01128        S << "Unexpected result from " << #x << " " << res << " "; \
01129        char buf[100]; \
01130        (void) strerror_r(res, &buf[0], sizeof(buf)); \
01131        S << buf << ends; \
01132        W_FATAL_MSG(fcINTERNAL, << S.c_str()); \
01133     }  \
01134 }
01135 #define DO_PTHREAD(x) \
01136 {   int res = x; \
01137     if(res) { \
01138        w_ostrstream S; \
01139        S << "Unexpected result from " << #x << " " << res << " "; \
01140        char buf[100]; \
01141        (void) strerror_r(res, &buf[0], sizeof(buf)); \
01142        S << buf << ends; \
01143        W_FATAL_MSG(fcINTERNAL, << S.c_str()); \
01144     }  \
01145 }
01146 #define DO_PTHREAD_TIMED(x) \
01147 {   int res = x; \
01148     if(res && res != ETIMEDOUT) { \
01149         W_FATAL_MSG(fcINTERNAL, \
01150                 <<"Unexpected result from " << #x << " " << res); \
01151     } \
01152 }
01153 
01154 /**\endcond skip */
01155 
01156 
01157 /*<std-footer incl-file-exclusion='STHREAD_H'>  -- do not edit anything below this line -- */
01158 
01159 #endif          /*</std-footer>*/

Generated on Mon Nov 8 11:12:38 2010 for Shore Storage Manager by  doxygen 1.4.7