Main Page   Alphabetical List   Data Structures   File List   Data Fields   Globals  

tuple_server.c

Go to the documentation of this file.
00001 /*****************************************************************
00002  *
00003  * LinuxTuples - an open-source tuple space for Linux clusters
00004  * Copyright (c) 2003, Will Ware <wware@alum.mit.edu>
00005  * All rights reserved.
00006  *
00007  *    Redistribution and use in source and binary forms, with or
00008  *    without modification, are permitted provided that the following
00009  *    conditions are met:
00010  *
00011  *    + Redistributions of source code must retain the above copyright
00012  *    notice, this list of conditions and the following disclaimer.
00013  *
00014  *    + Redistributions in binary form must reproduce the above
00015  *    copyright notice, this list of conditions and the following
00016  *    disclaimer in the documentation and/or other materials provided
00017  *    with the distribution.
00018  *
00019  *    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
00020  *    CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
00021  *    INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
00022  *    MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
00023  *    DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
00024  *    CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
00025  *    SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
00026  *    LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
00027  *    USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
00028  *    AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00029  *    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
00030  *    IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
00031  *    THE POSSIBILITY OF SUCH DAMAGE.
00032  *
00033  ******************************************************************/
00034 
00047 #include <stdio.h>
00048 #include <stdarg.h>
00049 #include <string.h>
00050 #include <stdlib.h>
00051 #include <malloc.h>
00052 #include <unistd.h>
00053 #include <fcntl.h>
00054 #include <errno.h>
00055 #include <sys/socket.h>
00056 #include <arpa/inet.h>
00057 #include <pthread.h>
00058 #include <semaphore.h>
00059 
00060 #include "tuple.h"
00061 
00062 sem_t log_sem;
00063 
00064 #define LOGPRINTF(fmt,a...)   \
00065    if (logptr < 8000) { logptr += sprintf(logbuf+logptr, fmt, ##a); }
00066 
00067 #define LOGTUPLE(s)    print_tuple(s);
00068 
00069 
00074 struct ttuple {
00075         struct tuple *tuple;
00076         struct ttuple *next;
00077         struct ttuple *previous;
00078 };
00079 
00084 struct ttuple *first_message = NULL;
00085 
00090 struct ttuple *last_message = NULL;
00091 
00092 
00093 
00094 #define MAXNUMTHREADS 100
00095 
00100 struct context client_list[MAXNUMTHREADS];
00101 
00102 
00108 sem_t tuple_space_access;
00109 
00114 unsigned char num_blocked = 0;
00115 
00119 sem_t blocked_sem;
00120 
00125 sem_t new_client_sem;
00126 
00127 
00133 static void
00134 add_tuple_to_space(struct tuple *t)
00135 {
00136         struct ttuple *s;
00137         s = malloc(sizeof(struct ttuple));
00138         if (s == NULL) {
00139                 perror("malloc failed");
00140                 exit(1);
00141         }
00142         s->tuple = t;
00143         s->next = NULL;
00144         sem_wait(&tuple_space_access);
00145 
00146         if (last_message == NULL)
00147                 first_message = s;
00148         else
00149                 last_message->next = s;
00150         s->previous = last_message;
00151         last_message = s;
00152         
00153         while (num_blocked) {
00154                 sem_post(&blocked_sem);
00155                 num_blocked--;
00156         }
00157         sem_post(&tuple_space_access);
00158 }
00159 
00160 
00161 
00166 static void
00167 remove_message_from_space(struct ttuple *s)
00168 {
00169         if (s->previous == NULL && s->next == NULL) {
00170                 /* I am the only message */
00171                 first_message = last_message = NULL;
00172         }
00173         else if (s->next == NULL) {
00174                 /* I am the last message */
00175                 last_message = s->previous;
00176                 last_message->next = NULL;
00177         }
00178         else if (s->previous == NULL) {
00179                 /* I am the first message */
00180                 first_message = s->next;
00181                 first_message->previous = NULL;
00182         }
00183         else {
00184                 /* I have both a previous and a next */
00185                 s->previous->next = s->next;
00186                 s->next->previous = s->previous;
00187         }
00188 }
00189 
00190 
00191 
00195 static void
00196 kill_this_thread(struct context *arg)
00197 {
00198         LOGPRINTF("kill thread for socket %d\n", arg->sock);
00199         arg->thr = -1;
00200         close(arg->sock);
00201         pthread_exit(NULL);
00202 }
00203  
00204 
00205 
00209 static void
00210 handle_get_read(struct context *ctx,
00211                 struct tuple *s,
00212                 int remove, int blocking)
00213 {
00214         struct ttuple *p;
00215         int sock = ctx->sock;
00216 
00217         while (1) {
00218                 sem_wait(&tuple_space_access);
00219                 for (p = first_message; p != NULL; p = p->next) {
00220                         if (tuples_match(p->tuple, s)) {
00221                                 if (remove) {
00222                                         remove_message_from_space(p);
00223                                 }
00224                                 sem_post(&tuple_space_access);
00225                                 /* ignore return value of send_tuple,
00226                                  * if client bails, it's his loss
00227                                  */
00228                                 sem_wait(&log_sem);
00229                                 LOGPRINTF("Socket %d gets/reads a tuple\n",
00230                                           sock);
00231                                 // LOGTUPLE(p->tuple);
00232                                 sem_post(&log_sem);
00233                                 if (send_tuple(ctx, p->tuple)) {
00234                                         /* Assume the client died while blocked.
00235                                          * If the tuple was removed, put it back.
00236                                          * Otherwise just kill this thread.
00237                                          */
00238                                         DBGPRINTF("remove=%d\n", remove);
00239                                         if (remove) {
00240                                                 add_tuple_to_space(p->tuple);
00241                                                 destroy_tuple(s);
00242                                                 return;
00243                                         }
00244                                 }
00245                                 if (remove) {
00246                                         destroy_tuple(p->tuple);
00247                                 }
00248                                 destroy_tuple(s);
00249                                 return;
00250                         }
00251                 }
00252                 sem_post(&tuple_space_access);
00253                 // LOGPRINTF("Socket %d couldn't find a matching tuple\n", sock);
00254                 if (blocking) {
00255                         /* wait for a tuple that might match */
00256                         num_blocked++;
00257                         sem_wait(&blocked_sem);
00258                 } else {
00259                         /* don't wait, return a failure code */
00260                         int ack = -1;
00261                         send_chunk(ctx, (char*) &ack, sizeof(int));
00262                         return;
00263                 }
00264         }
00265 }
00266 
00267 
00271 static void
00272 handle_dump_space(struct context *ctx)
00273 {
00274         struct ttuple *p;
00275         int num_templates;
00276         struct tuple **templates = NULL;
00277         int i, ok, count;
00278         struct tuple_list *x, *tlist = NULL;
00279 
00280         recv_chunk(ctx, (char*) &num_templates, sizeof(int));
00281 
00282         if (num_templates) {
00283                 templates =
00284                         (struct tuple **) malloc(num_templates *
00285                                                  sizeof(struct tuple *));
00286                 if (templates == NULL) {
00287                         /* ? ? ? ? ? ? */
00288                         return;
00289                 }
00290                 for (i = 0; i < num_templates; i++) {
00291                         templates[i] = recv_tuple(ctx);
00292                         if (templates[i] == NULL) {
00293                                 /* ? ? ? ? ? ? */
00294                                 return;
00295                         }
00296                 }
00297         }
00298 
00299         count = 0;
00300         sem_wait(&tuple_space_access);
00301         for (p = first_message; p != NULL; p = p->next) {
00302                 ok = 0;
00303                 for (i = 0; i < num_templates; i++) {
00304                         if (tuples_match(p->tuple, templates[i])) {
00305                                 ok = 1;
00306                                 break;
00307                         }
00308                 }
00309                 if (ok || num_templates == 0) {
00310                         x = malloc(sizeof(struct tuple_list));
00311                         if (x == NULL) {
00312                                 /* ? ? ? ? */
00313                                 return;
00314                         }
00315                         x->tup = p->tuple;
00316                         x->next = tlist;
00317                         tlist = x;
00318                         count++;
00319                 }
00320         }
00321         sem_post(&tuple_space_access);
00322 
00323         send_chunk(ctx, (char*) &count, sizeof(int));
00324         for (x = tlist; x != NULL; x = x->next) {
00325                 if (send_tuple(ctx, x->tup)) {
00326                         /* Assume the client died.
00327                          */
00328                         return;
00329                 }
00330         }
00331 
00332         while (tlist) {
00333                 x = tlist;
00334                 tlist = tlist->next;
00335                 free(x);
00336         }
00337 }
00338 
00339 
00340 
00345 static void *
00346 client_thread_func(void *varg)
00347 {
00348         struct context *ctx =
00349                 (struct context *) varg;
00350         struct tuple *s;
00351         unsigned int operation;
00352 
00353         if (recv_chunk(ctx, (char*) &operation, sizeof(int))) {
00354                 kill_this_thread(ctx);
00355         }
00356         if (operation == PUT) {
00357                 s = recv_tuple(ctx);
00358                 if (s == NULL) {
00359                         kill_this_thread(ctx);
00360                 }
00361                 sem_wait(&log_sem);
00362                 LOGPRINTF("*******************\n");
00363                 LOGPRINTF("PUT socket=%d  ", ctx->sock);
00364                 LOGTUPLE(s);
00365                 sem_post(&log_sem);
00366                 add_tuple_to_space(s);
00367                 /* send an ack */
00368                 send(ctx->sock, &operation, sizeof(int), MSG_NOSIGNAL);
00369         }
00370         else if (operation == GET) {
00371                 s = recv_tuple(ctx);
00372                 if (s == NULL) {
00373                         kill_this_thread(ctx);
00374                 }
00375                 sem_wait(&log_sem);
00376                 LOGPRINTF("*******************\n");
00377                 LOGPRINTF("GET socket=%d  ", ctx->sock);
00378                 LOGTUPLE(s);
00379                 sem_post(&log_sem);
00380                 handle_get_read(ctx, s, 1, 1);
00381         }
00382         else if (operation == READ) {
00383                 s = recv_tuple(ctx);
00384                 if (s == NULL) {
00385                         kill_this_thread(ctx);
00386                 }
00387                 sem_wait(&log_sem);
00388                 LOGPRINTF("*******************\n");
00389                 LOGPRINTF("READ socket=%d  ", ctx->sock);
00390                 LOGTUPLE(s);
00391                 sem_post(&log_sem);
00392                 handle_get_read(ctx, s, 0, 1);
00393         }
00394         else if (operation == GET_NB) {
00395                 s = recv_tuple(ctx);
00396                 if (s == NULL) {
00397                         kill_this_thread(ctx);
00398                 }
00399                 sem_wait(&log_sem);
00400                 LOGPRINTF("*******************\n");
00401                 LOGPRINTF("GET_NB socket=%d  ", ctx->sock);
00402                 LOGTUPLE(s);
00403                 sem_post(&log_sem);
00404                 handle_get_read(ctx, s, 1, 0);
00405         }
00406         else if (operation == READ_NB) {
00407                 s = recv_tuple(ctx);
00408                 if (s == NULL) {
00409                         kill_this_thread(ctx);
00410                 }
00411                 sem_wait(&log_sem);
00412                 LOGPRINTF("*******************\n");
00413                 LOGPRINTF("READ_NB socket=%d  ", ctx->sock);
00414                 LOGTUPLE(s);
00415                 sem_post(&log_sem);
00416                 handle_get_read(ctx, s, 0, 0);
00417         }
00418         else if (operation == DUMP) {
00419                 sem_wait(&log_sem);
00420                 LOGPRINTF("*******************\n");
00421                 LOGPRINTF("DUMP socket=%d\n", ctx->sock);
00422                 sem_post(&log_sem);
00423                 handle_dump_space(ctx);
00424         }
00425         else if (operation == LOG) {
00426                 sem_wait(&log_sem);
00427                 send(ctx->sock, logbuf, logptr, MSG_NOSIGNAL);
00428                 logptr = 0;
00429                 sem_post(&log_sem);
00430                 /* kill thread, but don't LOGPRINTF */
00431                 ctx->thr = -1;
00432                 close(ctx->sock);
00433                 pthread_exit(NULL);
00434         }
00435         else {
00436                 fprintf(stderr,
00437                         "Bad operation: %08X\n",
00438                         (unsigned int) operation);
00439         }
00440         kill_this_thread(ctx);
00441         return NULL;
00442 }
00443 
00444 
00445 
00450 int
00451 main(int argc, char *argv[])
00452 {
00453         int i, server_sock;
00454         int portnumber, status;
00455         int reuseaddr;
00456         struct sockaddr_in addr;
00457         pthread_attr_t detached;
00458 
00459         i_am_server = 1;
00460         reuseaddr = 1;
00461         
00462         if (argc < 2) {
00463                 /* help message */
00464                 fprintf(stderr, "Need a port number as a cmd line arg\n");
00465                 exit(1);
00466         }
00467 
00468         pthread_attr_init(&detached);
00469         pthread_attr_setdetachstate(&detached, 1);
00470         
00471         portnumber = atoi(argv[1]);
00472         server_sock = socket(AF_INET, SOCK_STREAM, 0);
00473         if (server_sock < 0) {
00474                 fprintf(stderr, __FILE__ " line %d\n", __LINE__);
00475                 perror("Can't create socket");
00476                 exit(1);
00477         }
00478 
00479         setsockopt(server_sock, SOL_SOCKET, SO_REUSEADDR,
00480                    &reuseaddr, sizeof(int));
00481 
00482         /* Open network listening socket */
00483         memset(&addr, 0, sizeof(addr));
00484         addr.sin_family = AF_INET;
00485         addr.sin_addr.s_addr = INADDR_ANY;
00486         addr.sin_port = htons(portnumber);
00487         
00488         status = bind(server_sock,
00489                       (struct sockaddr *)&addr,
00490                       sizeof(struct sockaddr_in));
00491         if (status < 0) {
00492                 fprintf(stderr, __FILE__ " line %d\n", __LINE__);
00493                 perror("bind() failed");
00494                 exit(1);
00495         }
00496         
00497         status = listen(server_sock, 1);
00498         if (status < 0) {
00499                 fprintf(stderr, __FILE__ " line %d\n", __LINE__);
00500                 perror("listen() failed");
00501                 exit(1);
00502         }
00503 
00504         for (i = 0; i < MAXNUMTHREADS; i++) {
00505                 client_list[i].thr = -1;
00506         }
00507 
00508         sem_init(&tuple_space_access, 0, 1);
00509         sem_init(&blocked_sem, 0, 0);
00510         sem_init(&log_sem, 0, 1);
00511         sem_init(&new_client_sem, 0, 1);
00512 
00513         while (1) {
00514                 struct context ctx;
00515                 ctx.sock = accept(server_sock, NULL, NULL);
00516                 sem_wait(&new_client_sem);
00517                 for (i = 0; i < MAXNUMTHREADS; i++)
00518                         if (client_list[i].thr == -1) {
00519                                 client_list[i] = ctx;
00520                                 if (pthread_create(&client_list[i].thr,
00521                                                    &detached,
00522                                                    client_thread_func,
00523                                                    &client_list[i])
00524                                     != 0) {
00525                                         fprintf(stderr,
00526                                                 "Threads are hosed\n");
00527                                         EXIT();
00528                                 }
00529                                 break;
00530                         }
00531                 sem_post(&new_client_sem);
00532                 /* Handle case where i == MAXNUMTHREADS, no threads
00533                  * available in our array.
00534                  */
00535         }
00536         
00537         return 0;
00538 }

Generated on Sun Mar 30 23:46:50 2003 by doxygen1.2.14 written by Dimitri van Heesch, © 1997-2002