00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include <stdio.h>
00036 #include <stdarg.h>
00037 #include <unistd.h>
00038 #include <string.h>
00039 #include <stdlib.h>
00040 #include <sys/types.h>
00041 #include <sys/stat.h>
00042 #include <fcntl.h>
00043 #include <sys/socket.h>
00044 #include <arpa/inet.h>
00045 #include <netdb.h>
00046 #include <sched.h>
00047
00048 #include "tuple.h"
00049
00055 int i_am_server = 0;
00056
00057 #ifdef DEBUG
00058 int debug_counter = 0;
00059 #endif
00060
00065 char logbuf[8192];
00069 int logptr = 0;
00070
00071 #define PERROR(x) fprintf(stderr, __FILE__":%d ", __LINE__); perror(x)
00072
00073
00080 int
00081 get_server_portnumber(struct context *ctx)
00082 {
00083 char *s;
00084 s = getenv(SERVERNAME_ENVVAR);
00085 if (s == NULL)
00086 return 1;
00087 strcpy(ctx->servername, s);
00088 s = getenv(PORTNUMBER_ENVVAR);
00089 if (s == NULL)
00090 return 1;
00091 ctx->portnumber = atoi(s);
00092 return 0;
00093 }
00094
00095
00096
00100 void
00101 print_element(struct element *e)
00102 {
00103 int i, n, too_long, tag = e->tag;
00104 unsigned char *s;
00105 const int max_str_len = 10;
00106 switch (tag) {
00107 case 'i':
00108 logptr += sprintf(logbuf+logptr, "%d", e->data.i);
00109 break;
00110 case 'd':
00111 logptr += sprintf(logbuf+logptr, "%f", e->data.d);
00112 break;
00113 case 's':
00114 s = e->data.s.ptr;
00115 n = e->data.s.len;
00116 too_long = 0;
00117 if (n > max_str_len) {
00118 n = max_str_len;
00119 too_long = 1;
00120 }
00121 logbuf[logptr++] = '"';
00122 for (i = 0; i < n; i++) {
00123 if (s[i] >= ' ' && s[i] <= '~') {
00124 logbuf[logptr++] = s[i];
00125 } else {
00126 logptr += sprintf(logbuf+logptr,
00127 "\\%02X", s[i]);
00128 }
00129 }
00130 if (too_long) {
00131 logptr += sprintf(logbuf+logptr, " ...");
00132 }
00133 logbuf[logptr++] = '"';
00134 break;
00135 case '?':
00136 logptr += sprintf(logbuf+logptr, "???");
00137 break;
00138 default:
00139 logptr += sprintf(logbuf+logptr,
00140 "<<unknown field, tag=%d>>",
00141 tag);
00142 break;
00143 }
00144 }
00145
00146
00150 void
00151 print_tuple(struct tuple *s)
00152 {
00153 int i, n, flag = 0;
00154 if (logptr >= 8000)
00155 return;
00156 logptr += sprintf(logbuf+logptr, "( ");
00157 n = s->num_elts;
00158 if (n > 10) {
00159 n = 10;
00160 flag = 1;
00161 }
00162 for (i = 0; i < n; i++) {
00163 if (i > 0) {
00164 logptr += sprintf(logbuf+logptr, "\t");
00165 }
00166 print_element(&s->elements[i]);
00167 logptr += sprintf(logbuf+logptr, ",\n");
00168 }
00169 if (flag) {
00170 logptr += sprintf(logbuf+logptr, ".... ");
00171 }
00172 logptr += sprintf(logbuf+logptr, ")\n");
00173 }
00174
00175
00176
00177
00181 static struct tuple *
00182 make_tuple_internal(char *fmt, va_list ap)
00183 {
00184 int i, elt_index, len;
00185 double d;
00186 char *s;
00187 struct tuple *t;
00188
00189 t = malloc(sizeof(struct tuple));
00190 if (t == NULL) {
00191 PERROR("malloc failed");
00192 EXIT();
00193 }
00194 t->tag = I_AM_A_TUPLE;
00195 t->num_elts = 0;
00196 t->string_space = NULL;
00197 for (s = fmt; *s; s++) {
00198 if (*s != '#')
00199 t->num_elts++;
00200 }
00201 t->elements = malloc(t->num_elts * sizeof(struct element));
00202 if (t->elements == NULL) {
00203 PERROR("malloc failed");
00204 EXIT();
00205 }
00206
00207 elt_index = 0;
00208 while (*fmt) {
00209 t->elements[elt_index].tag = *fmt;
00210 switch (*fmt++) {
00211 case '?':
00212
00213 break;
00214 case 'i':
00215 i = va_arg(ap, int);
00216 t->elements[elt_index].data.i = i;
00217 break;
00218 case 'd':
00219 d = va_arg(ap, double);
00220 t->elements[elt_index].data.d = d;
00221 break;
00222 case 's':
00223 s = va_arg(ap, char *);
00224 if (*fmt == '#') {
00225
00226 len = va_arg(ap, int);
00227 fmt++;
00228 } else {
00229
00230 len = strlen(s);
00231 }
00232 t->elements[elt_index].data.s.ptr = s;
00233 t->elements[elt_index].data.s.len = len;
00234 break;
00235 }
00236 elt_index++;
00237 }
00238 return t;
00239 }
00240
00245 struct tuple *
00246 make_tuple(char *fmt, ...)
00247 {
00248 struct tuple *t;
00249 va_list ap;
00250 va_start(ap, fmt);
00251 t = make_tuple_internal(fmt, ap);
00252 va_end(ap);
00253 return t;
00254 }
00255
00256
00260 void
00261 destroy_tuple(struct tuple *t)
00262 {
00263 if (t != NULL) {
00264 free(t->elements);
00265 if (t->string_space != NULL)
00266 free(t->string_space);
00267 free(t);
00268 }
00269 }
00270
00271
00275 int
00276 tuples_match(struct tuple *s, struct tuple *t)
00277 {
00278 int i, num_elts;
00279 num_elts = s->num_elts;
00280 if (num_elts != t->num_elts)
00281 return 0;
00282 for (i = 0; i < num_elts; i++) {
00283 int s_tag = s->elements[i].tag;
00284 int t_tag = t->elements[i].tag;
00285
00286 if (s_tag != '?' && t_tag != '?') {
00287 if (s_tag != t_tag)
00288 return 0;
00289 switch (s_tag) {
00290 case 'i':
00291 if (s->elements[i].data.i
00292 != t->elements[i].data.i)
00293 return 0;
00294 break;
00295 case 'd':
00296 if (s->elements[i].data.d
00297 != t->elements[i].data.d)
00298 return 0;
00299 break;
00300 case 's':
00301 if (s->elements[i].data.s.len !=
00302 t->elements[i].data.s.len)
00303 return 0;
00304 if (strncmp(s->elements[i].data.s.ptr,
00305 t->elements[i].data.s.ptr,
00306 s->elements[i].data.s.len) != 0)
00307 return 0;
00308 break;
00309 default:
00310 break;
00311 }
00312 }
00313 }
00314 return 1;
00315 }
00316
00317
00318
00319
00320
00325 unsigned int
00326 random_int(void)
00327 {
00328 int x, sock;
00329 sock = open("/dev/urandom", O_RDONLY);
00330 read(sock, &x, sizeof(int));
00331 close(sock);
00332 return x;
00333 }
00334
00335
00336
00337
00341 int
00342 send_chunk(struct context *ctx, char *buf, int bytes_to_send)
00343 {
00344 while (bytes_to_send) {
00345 int gotten, masks = 0;
00346
00347
00348
00349
00350
00351
00352 if (i_am_server)
00353 masks = MSG_NOSIGNAL;
00354 gotten = send(ctx->sock, buf, bytes_to_send, masks);
00355 if (gotten < 0) {
00356 PERROR("send failed");
00357 return 1;
00358 }
00359 buf += gotten;
00360 bytes_to_send -= gotten;
00361 sched_yield();
00362 }
00363 return 0;
00364 }
00365
00366
00370 int
00371 send_tuple(struct context *ctx, struct tuple *t)
00372 {
00373 int i, string_length;
00374 if (send_chunk(ctx, (char*) &t->num_elts, sizeof(int))) {
00375 PERROR("send_chunk failed");
00376 return 1;
00377 }
00378 string_length = 0;
00379 for (i = 0; i < t->num_elts; i++) {
00380 if (t->elements[i].tag == 's') {
00381 string_length += t->elements[i].data.s.len;
00382 }
00383 }
00384 if (send_chunk(ctx, (char*) &string_length, sizeof(int))) {
00385 PERROR("send_chunk failed");
00386 return 1;
00387 }
00388
00389 string_length = 0;
00390 for (i = 0; i < t->num_elts; i++) {
00391 struct element *e = &t->elements[i];
00392 if (e->tag == 's') {
00393
00394 if (send_chunk(ctx, (char*) &e->tag,
00395 sizeof(int))) {
00396 PERROR("send_chunk failed");
00397 return 1;
00398 }
00399 if (send_chunk(ctx, (char*) &string_length,
00400 sizeof(int))) {
00401 PERROR("send_chunk failed");
00402 return 1;
00403 }
00404 if (send_chunk(ctx, (char*) &e->data.s.len,
00405 sizeof(int))) {
00406 PERROR("send_chunk failed");
00407 return 1;
00408 }
00409 string_length += e->data.s.len;
00410 } else {
00411 if (send_chunk(ctx, (char*) e,
00412 sizeof(struct element))) {
00413 PERROR("send_chunk failed");
00414 return 1;
00415 }
00416 }
00417 }
00418
00419 for (i = 0; i < t->num_elts; i++) {
00420 if (t->elements[i].tag == 's') {
00421 if (send_chunk(ctx,
00422 (char*) t->elements[i].data.s.ptr,
00423 t->elements[i].data.s.len)) {
00424 PERROR("send_chunk failed");
00425 return 1;
00426 }
00427 }
00428 }
00429 return 0;
00430 }
00431
00432
00436 int
00437 recv_chunk(struct context *ctx, char *buf, int size)
00438 {
00439 int gotten;
00440
00441
00442
00443
00444
00445
00446
00447
00448
00449
00450
00451
00452
00453
00454
00455
00456 gotten = recv(ctx->sock, buf, size, MSG_WAITALL);
00457 if (gotten < size) {
00458 fprintf(stderr, "%s broke connection?\n",
00459 i_am_server ? "Client" : "Server");
00460 PERROR("recv failed");
00461 return 1;
00462 }
00463 return 0;
00464 }
00465
00466
00470 struct tuple *
00471 recv_tuple(struct context *ctx)
00472 {
00473 struct tuple *s;
00474 int i, num_elts, string_length, element_size;
00475
00476 if (recv_chunk(ctx, (char*) &num_elts, sizeof(int))) {
00477 PERROR("recv_chunk failed");
00478 return NULL;
00479 }
00480
00481
00482 if (num_elts == -1) {
00483 return (struct tuple *) -1;
00484 }
00485
00486 if (recv_chunk(ctx, (char*) &string_length, sizeof(int))) {
00487 PERROR("recv_chunk failed");
00488 return NULL;
00489 }
00490
00491 s = malloc(2 * sizeof(int) +
00492 num_elts * sizeof(struct element) +
00493 string_length);
00494 if (s == NULL) {
00495 PERROR("malloc failed");
00496 EXIT();
00497 }
00498 s->tag = I_AM_A_TUPLE;
00499 s->num_elts = num_elts;
00500 s->string_length = string_length;
00501
00502 element_size = num_elts * sizeof(struct element);
00503 s->elements = malloc(element_size);
00504 if (s->elements == NULL) {
00505 PERROR("malloc failed");
00506 EXIT();
00507 }
00508 if (recv_chunk(ctx, (char*) s->elements, element_size)) {
00509 PERROR("recv_chunk failed");
00510 return NULL;
00511 }
00512
00513 if (string_length) {
00514 s->string_space = malloc(string_length);
00515 if (s->string_space == NULL) {
00516 PERROR("malloc failed");
00517 EXIT();
00518 }
00519 if (recv_chunk(ctx, s->string_space, string_length)) {
00520 PERROR("recv_chunk failed");
00521 return NULL;
00522 }
00523
00524
00525 for (i = 0; i < num_elts; i++) {
00526 if (s->elements[i].tag == 's') {
00527 int n = (int) s->elements[i].data.s.ptr;
00528 s->elements[i].data.s.ptr =
00529 s->string_space + n;
00530 }
00531 }
00532 } else {
00533 s->string_space = NULL;
00534 }
00535 return s;
00536 }
00537
00538
00539
00540
00544 static int
00545 open_client_socket(struct context *ctx)
00546 {
00547 unsigned char *p, dotted_quad[20];
00548 struct sockaddr_in addr;
00549
00550 ctx->sock = socket(AF_INET, SOCK_STREAM, 0);
00551 if (ctx->sock < 0) {
00552 PERROR("Can't create socket");
00553 return 1;
00554 }
00555
00556
00557 p = gethostbyname(ctx->servername)->h_addr_list[0];
00558 sprintf(dotted_quad, "%u.%u.%u.%u",
00559 p[0], p[1], p[2], p[3]);
00560 p = dotted_quad;
00561 memset(&addr, 0, sizeof(addr));
00562 addr.sin_family = AF_INET;
00563 addr.sin_addr.s_addr = inet_addr(p);
00564 addr.sin_port = htons(ctx->portnumber);
00565
00566 if (connect(ctx->sock,
00567 (struct sockaddr *)&addr,
00568 sizeof(struct sockaddr_in)) < 0) {
00569 PERROR("connect failed");
00570 return 1;
00571 }
00572
00573 return 0;
00574 }
00575
00576
00577
00578
00582 int
00583 put_tuple(struct tuple *s, struct context *ctx)
00584 {
00585 int op = PUT;
00586 if (open_client_socket(ctx)) {
00587 PERROR("open_client_socket failed");
00588 return 1;
00589 }
00590 if (send_chunk(ctx, (char*) &op, sizeof(int))) {
00591 PERROR("send failed");
00592 return 1;
00593 }
00594 if (send_tuple(ctx, s)) {
00595 PERROR("send_tuple failed");
00596 return 1;
00597 }
00598 if (recv_chunk(ctx, (char*) &op, sizeof(int))) {
00599 PERROR("recv_chunk failed");
00600 return 1;
00601 }
00602 close(ctx->sock);
00603 return 0;
00604 }
00605
00606
00610 struct tuple *
00611 get_tuple(struct tuple *s, struct context *ctx)
00612 {
00613 int op = GET;
00614 if (open_client_socket(ctx)) {
00615 PERROR("open_client_socket failed");
00616 return NULL;
00617 }
00618 if (send_chunk(ctx, (char*) &op, sizeof(int))) {
00619 PERROR("send failed");
00620 return NULL;
00621 }
00622 if (send_tuple(ctx, s)) {
00623 PERROR("send_tuple failed");
00624 return NULL;
00625 }
00626 DBGPRINTF("\n");
00627 s = recv_tuple(ctx);
00628 DBGPRINTF("s = %08x\n", (int) s);
00629 if (s == NULL) {
00630 PERROR("recv_tuple failed");
00631 return NULL;
00632 }
00633 if (s == (struct tuple *) -1) {
00634 PERROR("recv_tuple failed");
00635 return NULL;
00636 }
00637 close(ctx->sock);
00638 return s;
00639 }
00640
00641
00645 struct tuple *
00646 read_tuple(struct tuple *s, struct context *ctx)
00647 {
00648 int op = READ;
00649 if (open_client_socket(ctx)) {
00650 PERROR("open_client_socket failed");
00651 return NULL;
00652 }
00653 if (send_chunk(ctx, (char*) &op, sizeof(int))) {
00654 PERROR("send failed");
00655 return NULL;
00656 }
00657 if (send_tuple(ctx, s)) {
00658 PERROR("send_tuple failed");
00659 return NULL;
00660 }
00661 DBGPRINTF("\n");
00662 s = recv_tuple(ctx);
00663 DBGPRINTF("s = %08x\n", (int) s);
00664 if (s == NULL) {
00665 PERROR("recv_tuple failed");
00666 return NULL;
00667 }
00668 if (s == (struct tuple *) -1) {
00669 PERROR("recv_tuple failed");
00670 return NULL;
00671 }
00672 close(ctx->sock);
00673 return s;
00674 }
00675
00676
00680 struct tuple *
00681 get_nb_tuple(struct tuple *s, struct context *ctx)
00682 {
00683 int op = GET_NB;
00684 if (open_client_socket(ctx)) {
00685 PERROR("open_client_socket failed");
00686 return (struct tuple *) -1;
00687 }
00688 if (send_chunk(ctx, (char*) &op, sizeof(int))) {
00689 PERROR("send failed");
00690 return (struct tuple *) -1;
00691 }
00692 if (send_tuple(ctx, s)) {
00693 PERROR("send_tuple failed");
00694 return (struct tuple *) -1;
00695 }
00696 s = recv_tuple(ctx);
00697 if (s == NULL) {
00698 PERROR("recv_tuple failed");
00699 return (struct tuple *) -1;
00700 }
00701 if (s == (struct tuple *) -1) {
00702 close(ctx->sock);
00703 return NULL;
00704 }
00705 close(ctx->sock);
00706 return s;
00707 }
00708
00709
00713 struct tuple *
00714 read_nb_tuple(struct tuple *s, struct context *ctx)
00715 {
00716 int op = READ_NB;
00717 if (open_client_socket(ctx)) {
00718 PERROR("open_client_socket failed");
00719 return (struct tuple *) -1;
00720 }
00721 if (send_chunk(ctx, (char*) &op, sizeof(int))) {
00722 PERROR("send failed");
00723 return (struct tuple *) -1;
00724 }
00725 if (send_tuple(ctx, s)) {
00726 PERROR("send_tuple failed");
00727 return (struct tuple *) -1;
00728 }
00729 DBGPRINTF("\n");
00730 s = recv_tuple(ctx);
00731 DBGPRINTF("s = %08x\n", (int) s);
00732 if (s == NULL) {
00733 PERROR("recv_tuple failed");
00734 return (struct tuple *) -1;
00735 }
00736 if (s == (struct tuple *) -1) {
00737 close(ctx->sock);
00738 return NULL;
00739 }
00740 close(ctx->sock);
00741 return s;
00742 }
00743
00744
00745
00746
00747
00748
00752 struct tuple_list *
00753 dump_tuple_space(struct tuple_list *templates,
00754 struct context *ctx)
00755 {
00756 int count, op = DUMP;
00757 struct tuple_list *s, *t, *tlist;
00758
00759 for (count = 0, s = templates;
00760 s != NULL;
00761 count++, s = s->next);
00762
00763 if (open_client_socket(ctx)) {
00764 PERROR("open_client_socket failed");
00765 return NULL;
00766 }
00767
00768 if (send_chunk(ctx, (char*) &op, sizeof(int))) {
00769 PERROR("send failed");
00770 return NULL;
00771 }
00772 if (send_chunk(ctx, (char*) &count, sizeof(int))) {
00773 PERROR("send failed");
00774 return NULL;
00775 }
00776 s = templates;
00777 while (s) {
00778 if (send_tuple(ctx, s->tup)) {
00779 PERROR("send_tuple failed");
00780 return NULL;
00781 }
00782 s = s->next;
00783 }
00784
00785
00786
00787
00788
00789 tlist = NULL;
00790 if (recv_chunk(ctx, (char*) &count, sizeof(int))) {
00791 PERROR("recv_chunk failed");
00792 return NULL;
00793 }
00794 while (count--) {
00795 struct tuple_list *x = malloc(sizeof(struct tuple_list));
00796 if (x == NULL) {
00797
00798 }
00799 x->tup = recv_tuple(ctx);
00800 DBGPRINTF("x->tup = %08x\n", (int) x->tup);
00801 if (x->tup == NULL) {
00802 PERROR("recv_tuple failed");
00803 return NULL;
00804 }
00805 if (x->tup == (struct tuple *) -1) {
00806 PERROR("recv_tuple failed");
00807 return NULL;
00808 }
00809 x->next = tlist;
00810 tlist = x;
00811 }
00812
00813 DBGPRINTF("tlist = %08x\n", (int) tlist);
00814 for (t = tlist; t != NULL; ) {
00815 DBGPRINTF("t = %08x\n", (int) t);
00816 DBGPRINTF("t->tup = %08x\n", (int) t->tup);
00817 DBGPRINTF("t->next = %08x\n", (int) t->next);
00818 t = t->next;
00819 }
00820
00821 return tlist;
00822 }
00823
00824
00828 int
00829 tuple_server_log(FILE *stream, struct context *ctx)
00830 {
00831 int n, op = LOG;
00832 const int bufsize = 8192;
00833 char buf[bufsize];
00834
00835 if (open_client_socket(ctx)) {
00836 PERROR("open_client_socket failed");
00837 return 1;
00838 }
00839 if (send_chunk(ctx, (char*) &op, sizeof(int))) {
00840 close(ctx->sock);
00841 PERROR("send_chunk failed");
00842 return 1;
00843 }
00844
00845 n = read(ctx->sock, buf, bufsize);
00846 if (n < 0) {
00847 PERROR("read failed");
00848 return 1;
00849 }
00850 if (n > 0 && fwrite(buf, 1, n, stream) < n)
00851 return 0;
00852 close(ctx->sock);
00853 return 0;
00854 }
00855
00856
00857
00858
00859
00863 static struct element *
00864 tuple_field(struct tuple *s, int n)
00865 {
00866 if (n >= s->num_elts) {
00867 fprintf(stderr, "tuple_field() index too big (%d)\n", n);
00868 fprintf(stderr, "tuple only has %d elements\n", s->num_elts);
00869 EXIT();
00870 }
00871 return &s->elements[n];
00872 }
00873
00874
00878 int
00879 tuple_int_field(struct tuple *s, int n)
00880 {
00881 struct element *e;
00882 e = tuple_field(s, n);
00883 if (e->tag != 'i') {
00884 fprintf(stderr, "tuple_int_field: field %d is not an int\n", n);
00885 fprintf(stderr, "Here is the tuple\n");
00886 print_tuple(s);
00887 logbuf[logptr] = '\0';
00888 fprintf(stderr, logbuf);
00889 EXIT();
00890 }
00891 return e->data.i;
00892 }
00893
00894
00898 double
00899 tuple_double_field(struct tuple *s, int n)
00900 {
00901 struct element *e;
00902 e = tuple_field(s, n);
00903 if (e->tag != 'd') {
00904 fprintf(stderr, "tuple_double_field: field %d is not a double\n", n);
00905 fprintf(stderr, "Here is the tuple\n");
00906 print_tuple(s);
00907 logbuf[logptr] = '\0';
00908 fprintf(stderr, logbuf);
00909 EXIT();
00910 }
00911 return e->data.d;
00912 }
00913
00914
00918 char *
00919 tuple_string_field(struct tuple *s, int *len, int n)
00920 {
00921 struct element *e;
00922 e = tuple_field(s, n);
00923 if (e->tag != 's') {
00924 fprintf(stderr, "tuple_string_field: field %d is not a string\n", n);
00925 fprintf(stderr, "Here is the tuple\n");
00926 print_tuple(s);
00927 logbuf[logptr] = '\0';
00928 fprintf(stderr, logbuf);
00929 EXIT();
00930 }
00931 if (len != NULL)
00932 *len = e->data.s.len;
00933 return e->data.s.ptr;
00934 }
00935
00936
00937