00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00047 #include <stdio.h>
00048 #include <stdarg.h>
00049 #include <string.h>
00050 #include <stdlib.h>
00051 #include <malloc.h>
00052 #include <unistd.h>
00053 #include <fcntl.h>
00054 #include <errno.h>
00055 #include <sys/socket.h>
00056 #include <arpa/inet.h>
00057 #include <pthread.h>
00058 #include <semaphore.h>
00059
00060 #include "tuple.h"
00061
00062 sem_t log_sem;
00063
00064 #define LOGPRINTF(fmt,a...) \
00065 if (logptr < 8000) { logptr += sprintf(logbuf+logptr, fmt, ##a); }
00066
00067 #define LOGTUPLE(s) print_tuple(s);
00068
00069
00074 struct ttuple {
00075 struct tuple *tuple;
00076 struct ttuple *next;
00077 struct ttuple *previous;
00078 };
00079
00084 struct ttuple *first_message = NULL;
00085
00090 struct ttuple *last_message = NULL;
00091
00092
00093
00094 #define MAXNUMTHREADS 100
00095
00100 struct context client_list[MAXNUMTHREADS];
00101
00102
00108 sem_t tuple_space_access;
00109
00114 unsigned char num_blocked = 0;
00115
00119 sem_t blocked_sem;
00120
00125 sem_t new_client_sem;
00126
00127
00133 static void
00134 add_tuple_to_space(struct tuple *t)
00135 {
00136 struct ttuple *s;
00137 s = malloc(sizeof(struct ttuple));
00138 if (s == NULL) {
00139 perror("malloc failed");
00140 exit(1);
00141 }
00142 s->tuple = t;
00143 s->next = NULL;
00144 sem_wait(&tuple_space_access);
00145
00146 if (last_message == NULL)
00147 first_message = s;
00148 else
00149 last_message->next = s;
00150 s->previous = last_message;
00151 last_message = s;
00152
00153 while (num_blocked) {
00154 sem_post(&blocked_sem);
00155 num_blocked--;
00156 }
00157 sem_post(&tuple_space_access);
00158 }
00159
00160
00161
00166 static void
00167 remove_message_from_space(struct ttuple *s)
00168 {
00169 if (s->previous == NULL && s->next == NULL) {
00170
00171 first_message = last_message = NULL;
00172 }
00173 else if (s->next == NULL) {
00174
00175 last_message = s->previous;
00176 last_message->next = NULL;
00177 }
00178 else if (s->previous == NULL) {
00179
00180 first_message = s->next;
00181 first_message->previous = NULL;
00182 }
00183 else {
00184
00185 s->previous->next = s->next;
00186 s->next->previous = s->previous;
00187 }
00188 }
00189
00190
00191
00195 static void
00196 kill_this_thread(struct context *arg)
00197 {
00198 LOGPRINTF("kill thread for socket %d\n", arg->sock);
00199 arg->thr = -1;
00200 close(arg->sock);
00201 pthread_exit(NULL);
00202 }
00203
00204
00205
00209 static void
00210 handle_get_read(struct context *ctx,
00211 struct tuple *s,
00212 int remove, int blocking)
00213 {
00214 struct ttuple *p;
00215 int sock = ctx->sock;
00216
00217 while (1) {
00218 sem_wait(&tuple_space_access);
00219 for (p = first_message; p != NULL; p = p->next) {
00220 if (tuples_match(p->tuple, s)) {
00221 if (remove) {
00222 remove_message_from_space(p);
00223 }
00224 sem_post(&tuple_space_access);
00225
00226
00227
00228 sem_wait(&log_sem);
00229 LOGPRINTF("Socket %d gets/reads a tuple\n",
00230 sock);
00231
00232 sem_post(&log_sem);
00233 if (send_tuple(ctx, p->tuple)) {
00234
00235
00236
00237
00238 DBGPRINTF("remove=%d\n", remove);
00239 if (remove) {
00240 add_tuple_to_space(p->tuple);
00241 destroy_tuple(s);
00242 return;
00243 }
00244 }
00245 if (remove) {
00246 destroy_tuple(p->tuple);
00247 }
00248 destroy_tuple(s);
00249 return;
00250 }
00251 }
00252 sem_post(&tuple_space_access);
00253
00254 if (blocking) {
00255
00256 num_blocked++;
00257 sem_wait(&blocked_sem);
00258 } else {
00259
00260 int ack = -1;
00261 send_chunk(ctx, (char*) &ack, sizeof(int));
00262 return;
00263 }
00264 }
00265 }
00266
00267
00271 static void
00272 handle_dump_space(struct context *ctx)
00273 {
00274 struct ttuple *p;
00275 int num_templates;
00276 struct tuple **templates = NULL;
00277 int i, ok, count;
00278 struct tuple_list *x, *tlist = NULL;
00279
00280 recv_chunk(ctx, (char*) &num_templates, sizeof(int));
00281
00282 if (num_templates) {
00283 templates =
00284 (struct tuple **) malloc(num_templates *
00285 sizeof(struct tuple *));
00286 if (templates == NULL) {
00287
00288 return;
00289 }
00290 for (i = 0; i < num_templates; i++) {
00291 templates[i] = recv_tuple(ctx);
00292 if (templates[i] == NULL) {
00293
00294 return;
00295 }
00296 }
00297 }
00298
00299 count = 0;
00300 sem_wait(&tuple_space_access);
00301 for (p = first_message; p != NULL; p = p->next) {
00302 ok = 0;
00303 for (i = 0; i < num_templates; i++) {
00304 if (tuples_match(p->tuple, templates[i])) {
00305 ok = 1;
00306 break;
00307 }
00308 }
00309 if (ok || num_templates == 0) {
00310 x = malloc(sizeof(struct tuple_list));
00311 if (x == NULL) {
00312
00313 return;
00314 }
00315 x->tup = p->tuple;
00316 x->next = tlist;
00317 tlist = x;
00318 count++;
00319 }
00320 }
00321 sem_post(&tuple_space_access);
00322
00323 send_chunk(ctx, (char*) &count, sizeof(int));
00324 for (x = tlist; x != NULL; x = x->next) {
00325 if (send_tuple(ctx, x->tup)) {
00326
00327
00328 return;
00329 }
00330 }
00331
00332 while (tlist) {
00333 x = tlist;
00334 tlist = tlist->next;
00335 free(x);
00336 }
00337 }
00338
00339
00340
00345 static void *
00346 client_thread_func(void *varg)
00347 {
00348 struct context *ctx =
00349 (struct context *) varg;
00350 struct tuple *s;
00351 unsigned int operation;
00352
00353 if (recv_chunk(ctx, (char*) &operation, sizeof(int))) {
00354 kill_this_thread(ctx);
00355 }
00356 if (operation == PUT) {
00357 s = recv_tuple(ctx);
00358 if (s == NULL) {
00359 kill_this_thread(ctx);
00360 }
00361 sem_wait(&log_sem);
00362 LOGPRINTF("*******************\n");
00363 LOGPRINTF("PUT socket=%d ", ctx->sock);
00364 LOGTUPLE(s);
00365 sem_post(&log_sem);
00366 add_tuple_to_space(s);
00367
00368 send(ctx->sock, &operation, sizeof(int), MSG_NOSIGNAL);
00369 }
00370 else if (operation == GET) {
00371 s = recv_tuple(ctx);
00372 if (s == NULL) {
00373 kill_this_thread(ctx);
00374 }
00375 sem_wait(&log_sem);
00376 LOGPRINTF("*******************\n");
00377 LOGPRINTF("GET socket=%d ", ctx->sock);
00378 LOGTUPLE(s);
00379 sem_post(&log_sem);
00380 handle_get_read(ctx, s, 1, 1);
00381 }
00382 else if (operation == READ) {
00383 s = recv_tuple(ctx);
00384 if (s == NULL) {
00385 kill_this_thread(ctx);
00386 }
00387 sem_wait(&log_sem);
00388 LOGPRINTF("*******************\n");
00389 LOGPRINTF("READ socket=%d ", ctx->sock);
00390 LOGTUPLE(s);
00391 sem_post(&log_sem);
00392 handle_get_read(ctx, s, 0, 1);
00393 }
00394 else if (operation == GET_NB) {
00395 s = recv_tuple(ctx);
00396 if (s == NULL) {
00397 kill_this_thread(ctx);
00398 }
00399 sem_wait(&log_sem);
00400 LOGPRINTF("*******************\n");
00401 LOGPRINTF("GET_NB socket=%d ", ctx->sock);
00402 LOGTUPLE(s);
00403 sem_post(&log_sem);
00404 handle_get_read(ctx, s, 1, 0);
00405 }
00406 else if (operation == READ_NB) {
00407 s = recv_tuple(ctx);
00408 if (s == NULL) {
00409 kill_this_thread(ctx);
00410 }
00411 sem_wait(&log_sem);
00412 LOGPRINTF("*******************\n");
00413 LOGPRINTF("READ_NB socket=%d ", ctx->sock);
00414 LOGTUPLE(s);
00415 sem_post(&log_sem);
00416 handle_get_read(ctx, s, 0, 0);
00417 }
00418 else if (operation == DUMP) {
00419 sem_wait(&log_sem);
00420 LOGPRINTF("*******************\n");
00421 LOGPRINTF("DUMP socket=%d\n", ctx->sock);
00422 sem_post(&log_sem);
00423 handle_dump_space(ctx);
00424 }
00425 else if (operation == LOG) {
00426 sem_wait(&log_sem);
00427 send(ctx->sock, logbuf, logptr, MSG_NOSIGNAL);
00428 logptr = 0;
00429 sem_post(&log_sem);
00430
00431 ctx->thr = -1;
00432 close(ctx->sock);
00433 pthread_exit(NULL);
00434 }
00435 else {
00436 fprintf(stderr,
00437 "Bad operation: %08X\n",
00438 (unsigned int) operation);
00439 }
00440 kill_this_thread(ctx);
00441 return NULL;
00442 }
00443
00444
00445
00450 int
00451 main(int argc, char *argv[])
00452 {
00453 int i, server_sock;
00454 int portnumber, status;
00455 int reuseaddr;
00456 struct sockaddr_in addr;
00457 pthread_attr_t detached;
00458
00459 i_am_server = 1;
00460 reuseaddr = 1;
00461
00462 if (argc < 2) {
00463
00464 fprintf(stderr, "Need a port number as a cmd line arg\n");
00465 exit(1);
00466 }
00467
00468 pthread_attr_init(&detached);
00469 pthread_attr_setdetachstate(&detached, 1);
00470
00471 portnumber = atoi(argv[1]);
00472 server_sock = socket(AF_INET, SOCK_STREAM, 0);
00473 if (server_sock < 0) {
00474 fprintf(stderr, __FILE__ " line %d\n", __LINE__);
00475 perror("Can't create socket");
00476 exit(1);
00477 }
00478
00479 setsockopt(server_sock, SOL_SOCKET, SO_REUSEADDR,
00480 &reuseaddr, sizeof(int));
00481
00482
00483 memset(&addr, 0, sizeof(addr));
00484 addr.sin_family = AF_INET;
00485 addr.sin_addr.s_addr = INADDR_ANY;
00486 addr.sin_port = htons(portnumber);
00487
00488 status = bind(server_sock,
00489 (struct sockaddr *)&addr,
00490 sizeof(struct sockaddr_in));
00491 if (status < 0) {
00492 fprintf(stderr, __FILE__ " line %d\n", __LINE__);
00493 perror("bind() failed");
00494 exit(1);
00495 }
00496
00497 status = listen(server_sock, 1);
00498 if (status < 0) {
00499 fprintf(stderr, __FILE__ " line %d\n", __LINE__);
00500 perror("listen() failed");
00501 exit(1);
00502 }
00503
00504 for (i = 0; i < MAXNUMTHREADS; i++) {
00505 client_list[i].thr = -1;
00506 }
00507
00508 sem_init(&tuple_space_access, 0, 1);
00509 sem_init(&blocked_sem, 0, 0);
00510 sem_init(&log_sem, 0, 1);
00511 sem_init(&new_client_sem, 0, 1);
00512
00513 while (1) {
00514 struct context ctx;
00515 ctx.sock = accept(server_sock, NULL, NULL);
00516 sem_wait(&new_client_sem);
00517 for (i = 0; i < MAXNUMTHREADS; i++)
00518 if (client_list[i].thr == -1) {
00519 client_list[i] = ctx;
00520 if (pthread_create(&client_list[i].thr,
00521 &detached,
00522 client_thread_func,
00523 &client_list[i])
00524 != 0) {
00525 fprintf(stderr,
00526 "Threads are hosed\n");
00527 EXIT();
00528 }
00529 break;
00530 }
00531 sem_post(&new_client_sem);
00532
00533
00534
00535 }
00536
00537 return 0;
00538 }