Threads are a widely supported abstraction for concurrent programming. Many modern operating systems have incorporated threads to leverage the availability of relatively low-cost multiprocessor hardware. In fact, this often represents the compelling reason to incorporate threads into any software design. A concurrent, multithreaded program has the potential to fully utilize hardware platforms with two or more processors, whereas a sequential, single-threaded program does not. Even on uniprocessor systems, multithreaded programs can improve throughput by overlapping processing and I/O requests without resorting to relatively complex and non-portable asynchronous I/O facilities. Threads have also long been recognized as a tool for implementing demanding software requirements for high availability and responsiveness. SmartSockets is designed to support the multiple-threads for each connection model and the single-thread for each connection model of multithreaded applications.
The benefits of a multithreaded application, however, must be weighed carefully against its costs in resource consumption, efficiency, and program complexity. Because threads operate within a shared address space, their activities must be carefully synchronized to insure that they do not interfere with one another. If an application does not lend itself to partitioning into discrete units of processing of a reasonable size, synchronization overhead can quickly outweigh the benefits of concurrency even on multiprocessor hardware platforms. Threads are simply not appropriate for all programs.
It is worth noting, however, that SmartSockets applications often do lend themselves well to multithreaded implementations. This is particularly true of server processes, which may get multiple requests from independent clients simultaneously. These requests must be handled serially if the server has only one thread of control, allowing long requests to block the servicing of other pending requests. Multithreaded processes can be more adaptive to such variations, allowing short requests to complete out of sequence. The use of multiple threads also offers an alternative to the UNIX convention of forking a child process to deal with each new client.
SmartSockets connections are designed to serve as high-level synchronization objects in multithreaded applications. Each connection has a set of mutexes (mutual exclusion locks) that are used to ensure that threads sharing the same connection do not interfere with each other, yet can operate concurrently where there is no chance of interference. For instance, one thread may be sending a message on a connection, while a second thread is retrieving the next received message from the connection’s queue, and a third and fourth thread are both running one of the connection’s process callbacks.
The following server and client examples demonstrate how to write a connection-based server program that handles multiple simultaneous clients by using a separate thread for each client connection. To simplify the example, a fixed number of server threads (controlled by #define
SERVER_COUNT
2
) are created in advance. This determines the maximum number of client connections that may be simultaneously serviced. Should more than SERVER_COUNT
clients attempt to connect to the server, excess clients would be unable to successfully complete their connection until one of the server threads becomes available.
There is also an example that extends this server to use multiple threads for each client connection. This allows multiple messages from the same client to be processed in parallel instead of serially, thus substantially improving response times. See Adding Multiple Threads for a Client for details.
The source code files for this example are located in these directories:
The online source files have additional #ifdefs
to provide C++ support. These #ifdefs
are not shown to simplify the example.
/* connmts1.c -- multithreaded connections example server 1 */
/*
This program uses multiple server threads which allows multiple clients to connect and submit messages simultaneously.
*/
#include <rtworks/ipc.h> #define SERVER_COUNT 2 T_IPC_CONN server_conn; T_TSD_KEY id_key = T_INVALID_TSD_KEY;/* =============================================================== */
/*..cb_process_numeric_data -- process callback for NUMERIC_DATA */
void T_ENTRY cb_process_numeric_data( T_IPC_CONN conn, T_IPC_CONN_PROCESS_CB_DATA data, T_CB_ARG arg ) { T_STR id_str; T_INT4 i; T_STR var_name[3]; T_REAL8 var_value[3]; T_REAL8 in_time; T_REAL8 out_time; T_STR the_time = NULL; if (!TutTsdGetValue(id_key, &id_str)) { TutOut("Could not get TSD value for thread: error <%s>.\n", TutErrStrGet()); TutExit(T_EXIT_FAILURE); }/* set current field to first field in message */
if (!TipcMsgSetCurrent(data->msg, 0)) { TutOut("%s: Could not set current field of msg: error <%s>.\n", id_str, TutErrStrGet()); TutThreadExit(NULL); } for (i = 0; i < 3; i++) { if (!TipcMsgNextStrReal8(data->msg, &var_name[i], &var_value[i])) { TutOut("%s: Could not parse NUMERIC_DATA msg: error <%s>.\n", id_str, TutErrStrGet()); TutThreadExit(NULL); } } in_time = TutGetWallTime(); the_time = TutStrDup(TutTimeNumToStr(in_time)); TutOut("%s: (%s %d %s %s %s %s) at %s\n", id_str, var_name[0], (T_INT4)var_value[0], var_name[1], TutRealToStr(var_value[1]), var_name[2], TutTimeNumToStr(var_value[2]), the_time); TutFree(the_time); TutSleep(var_value[1]); out_time = TutGetWallTime(); if (!TipcMsgWrite(data->msg, T_IPC_FT_STR, "in-time", T_IPC_FT_REAL8, in_time, T_IPC_FT_STR, "out-time", T_IPC_FT_REAL8, out_time, NULL)) { TutOut("%s: Could not append to NUMERIC_DATA msg: error <%s>.\n", id_str, TutErrStrGet()); TutThreadExit(NULL); } if (!TipcConnMsgSend(conn, data->msg)) { TutOut("%s: Could not send NUMERIC_DATA msg: error <%s>.\n", id_str, TutErrStrGet()); TutThreadExit(NULL); } if (!TipcConnFlush(conn)) { TutOut("%s: Could not flush conn to client: error <%s>.\n", id_str, TutErrStrGet()); TutThreadExit(NULL); } }/* cb_process_numeric_data */
/* =============================================================== */
/*..server_thread -- thread function for server threads */
T_PTR T_ENTRY server_thread(arg) T_PTR arg; { T_STRING id_str; T_IPC_MT mt; T_IPC_CONN client_conn; T_INT4 i; sprintf(id_str, "%d", *(T_INT4 *)arg); if (!TutTsdSetValue(id_key, id_str)) { TutOut("Could not set TSD value for thread: error <%s>.\n", TutErrStrGet()); TutExit(T_EXIT_FAILURE); } mt = TipcMtLookupByNum(T_MT_NUMERIC_DATA); if (NULL == mt) { TutOut("%s: Could not look up NUMERIC_DATA mt: error <%s>.\n", id_str, TutErrStrGet()); TutThreadExit(NULL); } for (i = 0; i < 2; i++) { client_conn = TipcConnAccept(server_conn); if (NULL == client_conn) { TutOut("%s: Could not accept client: error <%s>.\n", id_str, TutErrStrGet()); TutThreadExit(NULL); } TutOut("%s: Accepted client connection %d.\n", id_str, i + 1); if (TipcConnProcessCbCreate(client_conn, mt, cb_process_numeric_data, arg) == NULL) { TutOut("%s: Could not create NUMERIC_DATA process cb.\n", id_str); TutOut(" error <%s>.\n", TutErrStrGet()); TutThreadExit(NULL); } if (!TipcConnMainLoop(client_conn, T_TIMEOUT_FOREVER)) {/* make sure we reached the end of the data */
if (TutErrNumGet() != T_ERR_EOF) { TutOut("%s: Did not reach end of data: error <%s>.\n", id_str, TutErrStrGet()); } } TutOut("%s: Destroying client connection %d.\n", id_str, i + 1); if (!TipcConnDestroy(client_conn)) { TutOut("%s: Could not destroy client conn: error <%s>.\n", id_str, TutErrStrGet()); } } return NULL; }/* server_thread */
/* =============================================================== */
/*..main -- main program */
int main() { T_THREAD thread[SERVER_COUNT]; T_INT4 id_args[SERVER_COUNT]; T_INT4 i; if (!TipcInitThreads()) { TutOut("This platform does not support threads: error <%s>.\n", TutErrStrGet()); TutExit(T_EXIT_FAILURE); } if (!TutTsdKeyCreate(&id_key, 0)) { TutOut("Could not create TSD key: error <%s>.\n", TutErrStrGet()); TutExit(T_EXIT_FAILURE); } TutOut("Creating server connection.\n"); server_conn = TipcConnCreateServer("tcp:_node:4000"); if (NULL == server_conn) { TutOut("Could not create server connection: error <%s>.\n", TutErrStrGet()); TutExit(T_EXIT_FAILURE); } for (i = 0; i < SERVER_COUNT; i++) { id_args[i] = i + 1; thread[i] = TutThreadCreate((T_THREAD_FUNC)server_thread, &id_args[i], NULL); if (TutThreadEqual(T_INVALID_THREAD, thread[i])) { TutOut("Could not create thread %d: error <%s>.\n", i, TutErrStrGet()); TutExit(T_EXIT_FAILURE); } } for (i = 0; i < SERVER_COUNT; i++) { if (!TutThreadWait(thread[i], NULL)) { TutOut("Could not wait for thread %d: error <%s>.\n", i, TutErrStrGet()); TutExit(T_EXIT_FAILURE); } } TutOut("Destroying server connection.\n"); if (!TipcConnDestroy(server_conn)) { TutOut("Could not destroy server connection: error <%s>.\n", TutErrStrGet()); TutExit(T_EXIT_FAILURE); } return T_EXIT_SUCCESS; }/* main */
/* connmtc.c -- multithreaded connections example client */
/*
This program connects to the server process, submits several messages at once, and then outputs their individual response times.
*/
#include <rtworks/ipc.h> #define T_NUM_JOB_TIMES 5 T_REAL8 job_times[] = { 1.0, 2.0, 7.0, 2.0, 1.0 };/* =============================================================== */
/*..cb_process_numeric_data -- process callback for NUMERIC_DATA */
void T_ENTRY cb_process_numeric_data( T_IPC_CONN conn, T_IPC_CONN_PROCESS_CB_DATA data, T_CB_ARG arg ) { T_INT4 i; T_STR var_name[5]; T_REAL8 var_value[5];/* set current field to first field in message */
if (!TipcMsgSetCurrent(data->msg, 0)) { TutOut("Could not set current field of message: error <%s>.\n", TutErrStrGet()); TutExit(T_EXIT_FAILURE); } for (i = 0; i < 5; i++) { if (!TipcMsgNextStrReal8(data->msg, &var_name[i], &var_value[i])) { TutOut("Could not parse NUMERIC_DATA message: error <%s>.\n", TutErrStrGet()); TutExit(T_EXIT_FAILURE); } } TutOut("Reply (%s %d %s %s %s %s\n", var_name[0], (T_INT4)var_value[0], var_name[1], TutRealToStr(var_value[1]), var_name[2], TutTimeNumToStr(var_value[2])); TutOut(" %s %s ", var_name[3], TutTimeNumToStr(var_value[3])); TutOut("%s %s) elapsed %s seconds\n", var_name[4], TutTimeNumToStr(var_value[4]), TutRealToStr(var_value[4] - var_value[2])); }/* cb_process_numeric_data */
/* =============================================================== */
/*..main -- main program */
int main() { T_IPC_CONN conn; T_IPC_MT mt; T_IPC_MSG msg; T_INT4 i; TutOut("Creating connection to server process.\n"); conn = TipcConnCreateClient("tcp:_node:4000"); if (NULL == conn) { TutOut("Could not connect to server process: error <%s>.\n", TutErrStrGet()); TutExit(T_EXIT_FAILURE); } mt = TipcMtLookupByNum(T_MT_NUMERIC_DATA); if (NULL == mt) { TutOut("Could not look up NUMERIC_DATA msg type: error <%s>.\n", TutErrStrGet()); TutExit(T_EXIT_FAILURE); } if (TipcConnProcessCbCreate(conn, mt, cb_process_numeric_data, NULL) == NULL) { TutOut("Could not create NUMERIC_DATA process cb: error <%s>.\n", TutErrStrGet()); TutExit(T_EXIT_FAILURE); } msg = TipcMsgCreate(mt); if (NULL == msg) { TutOut("Could not create NUMERIC_DATA message: error <%s>.\n", TutErrStrGet()); TutExit(T_EXIT_FAILURE); } for (i = 0; i < T_NUM_JOB_TIMES; i++) { if (!TipcMsgWrite(msg, T_IPC_FT_STR, "message-no", T_IPC_FT_REAL8, (T_REAL8)i + 1.0, T_IPC_FT_STR, "job-time", T_IPC_FT_REAL8, job_times[i], T_IPC_FT_STR, "submit-time", T_IPC_FT_REAL8, TutGetWallTime(), NULL)) { TutOut("Could not append to NUMERIC_DATA msg: error <%s>.\n", TutErrStrGet()); TutExit(T_EXIT_FAILURE); } if (!TipcConnMsgSend(conn, msg)) { TutOut("Could not send NUMERIC_DATA message: error <%s>.\n", TutErrStrGet()); TutExit(T_EXIT_FAILURE); } if (!TipcMsgSetNumFields(msg, 0)) { TutOut("Could not truncate NUMERIC_DATA message: error <%s>.\n", TutErrStrGet()); TutExit(T_EXIT_FAILURE); } } if (!TipcMsgDestroy(msg)) { TutOut("Could not destroy NUMERIC_DATA msg: error <%s>.\n", TutErrStrGet()); TutExit(T_EXIT_FAILURE); } if (!TipcConnFlush(conn)) { TutOut("Could not flush connection to server: error <%s>.\n", TutErrStrGet()); TutExit(T_EXIT_FAILURE); } if (!TipcConnMainLoop(conn, 15.0)) { TutOut("Could not run main loop to server: error <%s>.\n", TutErrStrGet()); TutExit(T_EXIT_FAILURE); } TutOut("Destroying connection to server process.\n"); if (!TipcConnDestroy(conn)) { TutOut("Could not destroy connection to server: error <%s>.\n", TutErrStrGet()); TutExit(T_EXIT_FAILURE); } return T_EXIT_SUCCESS; }/* main */
The client program listed above is an excellent candidate for use with a server that processes multiple client messages concurrently (such as using multiple threads) because:
The original example multithreaded server program processes messages from different client connections concurrently, but messages from an individual client are still handled one at a time. By introducing multiple threads for each client connection, overall server throughput can be considerably increased. The built-in synchronization capabilities of the connection make the required modifications straightforward. The relevant portions of the second server program, connmts2.c
, are shown below.
For the purposes of this example, each client thread creates a fixed number of helper threads.
This function is executed by each of the helper threads:
/* =============================================================== */
/*..helper_thread -- thread function for helper threads */
T_PTR T_ENTRY helper_thread(arg) T_PTR arg; { T_PTR *argv = (T_PTR *)arg; T_STR id_str = argv[0]; T_IPC_CONN client_conn = argv[1]; TutFree(argv); if (!TutTsdSetValue(id_key, id_str)) { TutOut("Could not set TSD value for thread: error <%s>.\n", TutErrStrGet()); TutExit(T_EXIT_FAILURE); } if (!TipcConnMainLoop(client_conn, T_TIMEOUT_FOREVER)) { TutOut("Could not run main loop for thread: error <%s>.\n", TutErrStrGet()); TutExit(T_EXIT_FAILURE); } return NULL; }/* helper_thread */
The server_thread
function is modified so that it creates the helper threads when the server thread is connected to a client, and waits for them to exit when the client closes the connection:
/* =============================================================== */
/*..server_thread -- thread function for server threads */
T_PTR T_ENTRY server_thread(arg) T_PTR arg; { T_STRING id_str; T_IPC_MT mt; T_IPC_CONN client_conn; T_INT4 i; T_INT4 j; T_THREAD thread[HELPER_COUNT]; T_STRING id_strs[HELPER_COUNT]; sprintf(id_str, "%d'", *(T_INT4 *)arg); ... for (i = 0; i < 2; i++) { ... for (j = 0; j < HELPER_COUNT; j++) { T_PTR *argv = (T_PTR *)TutMalloc(sizeof(T_PTR) * 2); sprintf(id_strs[j], "%d%c", *(T_INT4 *)arg, 'a' + j); argv[0] = &id_strs[j]; argv[1] = client_conn; thread[j] = TutThreadCreate((T_THREAD_FUNC)helper_thread, argv, NULL); if (TutThreadEqual(T_INVALID_THREAD, thread[j])) { TutOut("Could not create thread %s: error <%s>.\n", id_strs[j], TutErrStrGet()); TutExit(T_EXIT_FAILURE); } } if (!TipcConnMainLoop(client_conn, T_TIMEOUT_FOREVER)) {/* make sure we reached the end of the data */
if (TutErrNumGet() != T_ERR_EOF) { TutOut("%s: Did not reach end of data: error <%s>.\n", id_str, TutErrStrGet()); } } for (j = 0; j < HELPER_COUNT; j++) { if (!TutThreadWait(thread[j], NULL)) { TutOut("Could not wait for thread %s: error <%s>.\n", id_strs[j], TutErrStrGet()); TutExit(T_EXIT_FAILURE); } } TutOut("%s: Destroying client connection %d.\n", id_str, i + 1); if (!TipcConnDestroy(client_conn)) { TutOut("%s: Could not destroy client conn: error <%s>.\n", id_str, TutErrStrGet()); } } return NULL; }/* server_thread */
To compile, link, and run the example programs, first you must either copy the programs to your own directory or have write permission in one of these directories:
Compile and link the programs
$ cc connmts1.c $ rtlink /exec=connmts1.exe connmts1.obj $ cc connmtc.c $ rtlink /exec=connmtc.exe connmtc.obj $ cc connmts2.c $ rtlink /exec=connmts2.exe connmts2.obj
To run the programs, start the first or second server process first in one terminal emulator window and then the client process in another terminal emulator window.
Start the first server program in the first window
To start the second server program instead of the first server program, replace s1
with s2
.
Start the client program in the second window
This is an example of the output from the first server process:
Creating server connection. 1: Accepted client connection 1. 1: (message-no 1 job-time 1 submit-time 13:10:16) at 13:10:16 1: (message-no 2 job-time 2 submit-time 13:10:16) at 13:10:17 1: (message-no 3 job-time 7 submit-time 13:10:16) at 13:10:19 2: Accepted client connection 1. 2: (message-no 1 job-time 1 submit-time 13:10:22) at 13:10:23 2: (message-no 2 job-time 2 submit-time 13:10:22) at 13:10:24 2: (message-no 3 job-time 7 submit-time 13:10:22) at 13:10:26 1: (message-no 4 job-time 2 submit-time 13:10:16) at 13:10:26 1: (message-no 5 job-time 1 submit-time 13:10:16) at 13:10:29 1: Destroying client connection 1. 1: Accepted client connection 2. 1: (message-no 1 job-time 1 submit-time 13:10:32) at 13:10:32 2: (message-no 4 job-time 2 submit-time 13:10:22) at 13:10:33 1: (message-no 2 job-time 2 submit-time 13:10:32) at 13:10:33 2: (message-no 5 job-time 1 submit-time 13:10:22) at 13:10:35 1: (message-no 3 job-time 7 submit-time 13:10:32) at 13:10:35 2: Destroying client connection 1. 2: Accepted client connection 2. 2: (message-no 1 job-time 1 submit-time 13:10:40) at 13:10:40 2: (message-no 2 job-time 2 submit-time 13:10:40) at 13:10:41 1: (message-no 4 job-time 2 submit-time 13:10:32) at 13:10:42 2: (message-no 3 job-time 7 submit-time 13:10:40) at 13:10:43 1: (message-no 5 job-time 1 submit-time 13:10:32) at 13:10:44 1: Destroying client connection 2. 2: (message-no 4 job-time 2 submit-time 13:10:40) at 13:10:50 2: (message-no 5 job-time 1 submit-time 13:10:40) at 13:10:52 2: Destroying client connection 2. Destroying server connection.
Here is an example of the output from each of the client processes connected to the first server:
Creating connection to server process. Reply (message-no 1 job-time 1 submit-time 13:10:16 in-time 13:10:16 out-time 13:10:17) elapsed 1.122 seconds Reply (message-no 2 job-time 2 submit-time 13:10:16 in-time 13:10:17 out-time 13:10:19) elapsed 3.105 seconds Reply (message-no 3 job-time 7 submit-time 13:10:16 in-time 13:10:19 out-time 13:10:26) elapsed 10.105 seconds Reply (message-no 4 job-time 2 submit-time 13:10:16 in-time 13:10:26 out-time 13:10:29) elapsed 12.108 seconds Reply (message-no 5 job-time 1 submit-time 13:10:16 in-time 13:10:29 out-time 13:10:30) elapsed 13.129 seconds Destroying connection to server process.
Here is an example of the output from the second server process:
Creating server connection. 1': Accepted client connection 1. 1': (message-no 1 job-time 1 submit-time 13:12:21) at 13:12:21 1a: (message-no 2 job-time 2 submit-time 13:12:21) at 13:12:21 1b: (message-no 3 job-time 7 submit-time 13:12:21) at 13:12:21 1': (message-no 4 job-time 2 submit-time 13:12:21) at 13:12:22 2': Accepted client connection 1. 2': (message-no 1 job-time 1 submit-time 13:12:23) at 13:12:23 2a: (message-no 2 job-time 2 submit-time 13:12:23) at 13:12:23 2b: (message-no 3 job-time 7 submit-time 13:12:23) at 13:12:23 1a: (message-no 5 job-time 1 submit-time 13:12:21) at 13:12:23 2': (message-no 4 job-time 2 submit-time 13:12:23) at 13:12:24 2a: (message-no 5 job-time 1 submit-time 13:12:23) at 13:12:25 1': Destroying client connection 1. 1': Accepted client connection 2. 1': (message-no 1 job-time 1 submit-time 13:12:36) at 13:12:36 1a: (message-no 2 job-time 2 submit-time 13:12:36) at 13:12:36 1b: (message-no 3 job-time 7 submit-time 13:12:36) at 13:12:36 1': (message-no 4 job-time 2 submit-time 13:12:36) at 13:12:37 2': Destroying client connection 1. 1a: (message-no 5 job-time 1 submit-time 13:12:36) at 13:12:38 2': Accepted client connection 2. 2': (message-no 1 job-time 1 submit-time 13:12:39) at 13:12:40 2a: (message-no 2 job-time 2 submit-time 13:12:39) at 13:12:40 2b: (message-no 3 job-time 7 submit-time 13:12:39) at 13:12:40 2': (message-no 4 job-time 2 submit-time 13:12:39) at 13:12:41 2a: (message-no 5 job-time 1 submit-time 13:12:39) at 13:12:42 1': Destroying client connection 2. 2': Destroying client connection 2. Destroying server connection.
Here is an example of the output from each of the client processes connected to the second server: (Note the reduction in message response elapsed times.)
Creating connection to server process. Reply (message-no 1 job-time 1 submit-time 13:12:21 in-time 13:12:21 out-time 13:12:22) elapsed 1.152 seconds Reply (message-no 2 job-time 2 submit-time 13:12:21 in-time 13:12:21 out-time 13:12:23) elapsed 2.153 seconds Reply (message-no 4 job-time 2 submit-time 13:12:21 in-time 13:12:22 out-time 13:12:24) elapsed 3.155 seconds Reply (message-no 5 job-time 1 submit-time 13:12:21 in-time 13:12:23 out-time 13:12:24) elapsed 3.155 seconds Reply (message-no 3 job-time 7 submit-time 13:12:21 in-time 13:12:21 out-time 13:12:28) elapsed 7.15 seconds Destroying connection to server process.
Connection programs that call the Tipc* API functions from more than one thread must first call TipcInitThreads, even if the program does not use any SmartSockets threads features. See the reference page for TipcInitThreads in the TIBCO SmartSockets Application Programming Interface reference for more details. For example:
Connection programs are free to mix and match the portable TutThread*, TutMutex*, TutCond*, and TutTsd* functions with the non-portable native threads functions, such as the POSIX pthreads
functions or Win32 threads functions. For more information on the SmartSockets portable multithreading functions, see the TIBCO SmartSockets Utilities reference.
The primitive synchronization properties within each connection are used individually or in combinations by the TipcConn* API functions to ensure that threads sharing the connection do not interfere with one another or corrupt the connection’s internal state.
TIBCO SmartSockets™ User’s Guide Software Release 6.8, July 2006 Copyright © TIBCO Software Inc. All rights reserved www.tibco.com |