Source listing
The main thread starts a connection that is named con1 and declares a cursor on table t. It then opens the cursor and makes connection con1 dormant. The main thread then starts six threads (six instances of the threads_all() function) and waits for the threads to complete their work with the pthread_join() DCE call.
Each thread uses the connection con1 and the opened
cursor to perform a fetch operation. After the fetch operation, the
program makes the connection dormant. Threads use connection con1 in
a sequential manner because only one thread can use the connection
at a time. Each thread reads the next record from the t table.
/* **************************************************************
* Program Name: thread_safe()
*
* purpose : If a server connection is initiated with the WITH
* CONCURRENT TRANSACTION clause, an ongoing transaction
* can be shared across threads that subsequently
* connect to that server.
* In addition, if an open cursor is associated with such
* connection, the cursor will remain open when the
* connection
* is made dormant. Therefore, multiple threads can share a
* cursor.
*
* Methods : - Create database db_con221 and table t1.
* - Insert 6 rows into table t1, i.e. values 1 through 6.
* - Connect to db_con221 as con1 with CONCURRENT
* TRANSACTION.
* The CONCURRENT TRANSACTION option is required since
* all
* threads use the cursor throughout the same
* connection.
* - Declare c1 cursor for "select a from t1 order by a".
* - Open the cursor.
* - Start 6 threads. Use DCE pthread_join() to determine if
* all threads complete & all threads do same thing as
* follows.
* For thread_1, thread_2, ..., thread_6:
* o SET CONNECTION con1
* o FETCH a record and display it.
* o SET CONNECTION con1 DORMANT
* - Disconnect all connections.
**************************************************************
*/
#include <pthread.h>
#include <dce/dce_error.h>
/* global definitions */
#define num_threads 6
/* Function definitions */
static void thread_all();
static long dr_dbs();
static int checksql(char *str, long expected_err, char *con_name);
static void dce_err();
/* Host variables declaration */
EXEC SQL BEGIN DECLARE SECTION;
char con1[] = "con1";
EXEC SQL END DECLARE SECTION;
/* ****************************************************
* Main Thread
******************************************************/
main()
{
/* create database */
EXEC SQL create database db_con221 with log;
if (! checksql("create database", 0, EMPTYSTR))
{
printf("MAIN:: create database returned status {%d}\n", SQLCODE);
exit(1);
}
EXEC SQL create table t1( sales int);
if (! checksql( "create_table", 0, EMPTYSTR))
{
dr_dbs("db_con221");
printf("MAIN:: create table returned status {%d}\n", SQLCODE);
exit(1);
}
if ( populate_tab() != FUNCSUCC)
{
dr_dbs("db_con221");
printf("MAIN:: returned status {%d}\n", SQLCODE);
exit(1);
}
EXEC SQL close database;
checksql("[main] <close database>", 0, EMPTYSTR);
/* Establish connection 'con1' */
EXEC SQL connect to 'db_con221' as 'con1' WITH CONCURRENT TRANSACTION;
if (! checksql("MAIN:: <close database>", 0, EMPTYSTR))
{
dr_dbs("db_con221");
exit(1);
}
/* Declare cursor c1 associated with the connection con1 */
EXEC SQL prepare tabid from "select sales from t1 order by sales";
checksql("MAIN:: <prepare>", 0, EMPTYSTR);
EXEC SQL declare c1 cursor for tabid;
checksql("MAIN:: <declare c1 cursor for>", 0, EMPTYSTR);
/* Open cursor c1 and make the connection dormant */
EXEC SQL open c1;
checksql("MAIN:: <open c1>", 0, EMPTYSTR);
EXEC SQL set connection :con1 dormant;
checksql("MAIN:: <set connection con1 dormant>", 0, EMPTYSTR);
/* Start threads */
start_threads();
/* Close cursor and drop database */
EXEC SQL set connection :con1;
checksql("MAIN:: set connection", 0, EMPTYSTR);
EXEC SQL close c1;
checksql("MAIN:: <close cursor>", 0, EMPTYSTR);
EXEC SQL free c1;
checksql("MAIN:: <free cursor>", 0, EMPTYSTR);
EXEC SQL disconnect all;
checksql("MAIN:: disconnect all", 0, EMPTYSTR);
dr_dbs("db_con221");
} /* end of Main Thread */
/**********************************************************************
* Function: thread_all()
* Purpose : Uses connection con1 and fetches a row from table t1 using *
cursor c1.
* Returns : Nothing
**********************************************************************/
static void thread_all(thread_num)
int *thread_num;
{
EXEC SQL BEGIN DECLARE SECTION;
int val;
EXEC SQL END DECLARE SECTION;
/* Wait for the connection to become available */
do {
EXEC SQL set connection :con1;
} while (SQLCODE == -1802);
checksql("thread_all: set connection", 0, con1);
/* Fetch a row */
EXEC SQL fetch c1 into :val;
checksql("thread_all: fetch c1 into :val", 0, con1);
/* Free connection con1 */
EXEC SQL set connection :con1 dormant;
checksql("thread_all: set connection con1 dormant", 0, EMPTYSTR);
printf("Thread id %d fetched value %d from t1\n", *thread_num, val);
} /* thread_all() */
/**********************************************************************
* Function: start_threads()
* purpose : Create num_threads and passes a thread id number to each
* thread
**********************************************************************/
start_threads()
{
int thread_num[num_threads];
pthread_t thread_id[num_threads];
int i, ret, return_value;
for(i=0; i< num_threads; i++)
{
thread_num[i] = i;
if ((pthread_create(&thread_id[i], pthread_attr_default
(pthread_startroutine_t ) thread_all, &thread_num[i])) == -1)
{
dce_err(__FILE__, "pthread_create failed", (unsigned long)-1);
dr_dbs("db_con221");
exit(1);
}
}
/* Wait for all the threads to complete their work */
for(i=0; i< num_threads; i++)
{
ret = pthread_join(thread_id[i], (pthread_addr_t *) &return_value);
if(ret == -1)
{
dce_err(__FILE__, "pthread_join", (unsigned long)-1);
dr_dbs("db_con221");
exit(1);
}
}
} /* start_threads() */
/**********************************************************************
* Function: populate_tab()
* Purpose : insert values in table t1.
* Returns : FUNCSUCC on success and FUNCFAIL when it fails.
**********************************************************************/
static int
populate_tab()
{
EXEC SQL BEGIN DECLARE SECTION;
int i;
EXEC SQL END DECLARE SECTION;
EXEC SQL begin work;
if (!checksql("begin work", 0,EMPTYSTR))
return FUNCFAIL;
for (i=1; i<=num_threads; i++)
{
EXEC SQL insert into t1 values (:i);
if(!checksql("insert", 0,EMPTYSTR))
return FUNCFAIL;
}
EXEC SQL commit work;
if (!checksql("commit work", 0,EMPTYSTR))
return FUNCFAIL;
return FUNCSUCC;
} /* populate_tab() */
/**********************************************************************
* Function: dr_dbs()
* Purpose : drops the database.
**********************************************************************/
long dr_dbs(db_name)
EXEC SQL BEGIN DECLARE SECTION;
char *db_name;
EXEC SQL END DECLARE SECTION;
{
EXEC SQL connect to DEFAULT;
checksql("dr_dbs: connect", 0, "DEFAULT");
EXEC SQL drop database :db_name;
checksql("dr_dbs: drop database", 0, EMPTYSTR);
EXEC SQL disconnect all;
checksql("dr_dbs: disconnect all", 0, EMPTYSTR);
} /*dr_dbs() */
/**********************************************************************
* Function: checksql()
* Purpose : To check the SQLCODE against the expected error
* (or the expected SQLCODE) and issue a proper message.
* Returns : FUNCSUCC on success & FUNCFAIL on FAILURE.
**********************************************************************/
int checksql(str, expected_err, con_name)
char *str;
long expected_err;
char *con_name;
{
if (SQLCODE != expected_err)
{
printf( "%s %s Returned {%d}, Expected {%d}\n", str, con_name,
SQLCODE,
expected_err);
return(FUNCFAIL);
}
return (FUNCSUCC);
} /* checksql() */
/**********************************************************************
* Function: dce_err()
* purpose : prints error from dce lib routines
* return : nothing
**********************************************************************/
void dce_err(program, routine, status)
char *program, *routine;
unsigned long status;
{
int dce_err_status;
char dce_err_string[dce_c_error_string_len+1];
if(status == (unsigned long)-1)
{
dce_err_status = 0;
sprintf(dce_err_string, "returned FAILURE (errno is %d)", errno);
}
else
dce_error_inq_text(status, (unsigned char *)dce_err_string,
&dce_err_status);
if(!dce_err_status)
{
fprintf(stderr, "%s: error in %s:\n ==> %s (%lu) <==\n",
program, routine, dce_err_string, status);
}
else
fprintf(stderr, "%s: error in %s: %lu\n", program, routine, status);
} /* dce_err() */