00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00116 #ifdef USE_TCP
00117
00118
00119 #ifndef SHM_MEM
00120 #error "shared memory support needed (add -DSHM_MEM to Makefile.defs)"
00121 #endif
00122
00123 #define HANDLE_IO_INLINE
00124 #include "io_wait.h"
00125
00126
00127 #include <sys/time.h>
00128 #include <sys/types.h>
00129 #include <sys/select.h>
00130 #include <sys/socket.h>
00131 #ifdef HAVE_FILIO_H
00132 #include <sys/filio.h>
00133 #elif defined __OS_solaris
00134 #define BSD_COMP
00135 #endif
00136 #include <sys/ioctl.h>
00137 #include <netinet/in.h>
00138 #include <netinet/in_systm.h>
00139 #include <netinet/ip.h>
00140 #include <netinet/tcp.h>
00141 #include <sys/uio.h>
00142 #include <netdb.h>
00143 #include <stdlib.h>
00144
00145 #include <unistd.h>
00146
00147 #include <errno.h>
00148 #include <string.h>
00149
00150 #ifdef HAVE_SELECT
00151 #include <sys/select.h>
00152 #endif
00153 #include <sys/poll.h>
00154
00155
00156 #include "ip_addr.h"
00157 #include "pass_fd.h"
00158 #include "tcp_conn.h"
00159 #include "globals.h"
00160 #include "pt.h"
00161 #include "locking.h"
00162 #include "mem/mem.h"
00163 #include "mem/shm_mem.h"
00164 #include "timer.h"
00165 #include "sr_module.h"
00166 #include "tcp_server.h"
00167 #include "tcp_init.h"
00168 #include "tcp_int_send.h"
00169 #include "tcp_stats.h"
00170 #include "tcp_ev.h"
00171 #include "tsend.h"
00172 #include "timer_ticks.h"
00173 #include "local_timer.h"
00174 #ifdef CORE_TLS
00175 #include "tls/tls_server.h"
00176 #define tls_loaded() 1
00177 #else
00178 #include "tls_hooks_init.h"
00179 #include "tls_hooks.h"
00180 #endif
00181 #ifdef USE_DST_BLACKLIST
00182 #include "dst_blacklist.h"
00183 #endif
00184
00185 #include "tcp_info.h"
00186 #include "tcp_options.h"
00187 #include "ut.h"
00188 #include "cfg/cfg_struct.h"
00189
00190 #define local_malloc pkg_malloc
00191 #define local_free pkg_free
00192
00193 #include <fcntl.h>
00194
00195
00196 #ifdef NO_MSG_DONTWAIT
00197 #ifndef MSG_DONTWAIT
00198
00199 #define MSG_DONTWAIT 0
00200 #endif
00201 #endif
00202
00203
00204 #define TCP_PASS_NEW_CONNECTION_ON_DATA
00205
00206
00207 #define TCP_LISTEN_BACKLOG 1024
00208 #define SEND_FD_QUEUE
00209
00210 #define TCP_CHILD_NON_BLOCKING
00211 #ifdef SEND_FD_QUEUE
00212 #ifndef TCP_CHILD_NON_BLOCKING
00213 #define TCP_CHILD_NON_BLOCKING
00214 #endif
00215 #define MAX_SEND_FD_QUEUE_SIZE tcp_main_max_fd_no
00216 #define SEND_FD_QUEUE_SIZE 128
00217 #define SEND_FD_QUEUE_TIMEOUT MS_TO_TICKS(2000)
00218 #endif
00219
00220
00221 #define TCPCONN_TIMEOUT_MIN_RUN 1
00222 #define TCPCONN_WAIT_TIMEOUT 1
00223
00224 #ifdef TCP_ASYNC
00225 static unsigned int* tcp_total_wq=0;
00226 #endif
00227
00228
00229 enum fd_types { F_NONE, F_SOCKINFO ,
00230 F_TCPCONN, F_TCPCHILD, F_PROC };
00231
00232
00233 #ifdef TCP_FD_CACHE
00234
00235 #define TCP_FD_CACHE_SIZE 8
00236
00237 struct fd_cache_entry{
00238 struct tcp_connection* con;
00239 int id;
00240 int fd;
00241 };
00242
00243
00244 static struct fd_cache_entry fd_cache[TCP_FD_CACHE_SIZE];
00245 #endif
00246
00247 static int is_tcp_main=0;
00248
00249
00250 enum poll_types tcp_poll_method=0;
00251 int tcp_main_max_fd_no=0;
00252 int tcp_max_connections=DEFAULT_TCP_MAX_CONNECTIONS;
00253 int tls_max_connections=DEFAULT_TLS_MAX_CONNECTIONS;
00254
00255 static union sockaddr_union tcp_source_ipv4_addr;
00256 static union sockaddr_union* tcp_source_ipv4=0;
00257 #ifdef USE_IPV6
00258 static union sockaddr_union tcp_source_ipv6_addr;
00259 static union sockaddr_union* tcp_source_ipv6=0;
00260 #endif
00261
00262 static int* tcp_connections_no=0;
00263 static int* tls_connections_no=0;
00264
00265
00266 struct tcp_conn_alias** tcpconn_aliases_hash=0;
00267
00268 struct tcp_connection** tcpconn_id_hash=0;
00269 gen_lock_t* tcpconn_lock=0;
00270
00271 struct tcp_child* tcp_children=0;
00272 static int* connection_id=0;
00273
00274
00275 int unix_tcp_sock;
00276
00277 static int tcp_proto_no=-1;
00278
00279
00280 static io_wait_h io_h;
00281
00282 static struct local_timer tcp_main_ltimer;
00283 static ticks_t tcp_main_prev_ticks;
00284
00285
00286
00287
00288
00289
00290 static int tcp_sockets_gworkers = 0;
00291
00292 static ticks_t tcpconn_main_timeout(ticks_t , struct timer_ln* , void* );
00293
00294 inline static int _tcpconn_add_alias_unsafe(struct tcp_connection* c, int port,
00295 struct ip_addr* l_ip, int l_port,
00296 int flags);
00297
00298
00299
00300
00301
00302
00303
00304 int tcp_set_src_addr(struct ip_addr* ip)
00305 {
00306 switch (ip->af){
00307 case AF_INET:
00308 ip_addr2su(&tcp_source_ipv4_addr, ip, 0);
00309 tcp_source_ipv4=&tcp_source_ipv4_addr;
00310 break;
00311 #ifdef USE_IPV6
00312 case AF_INET6:
00313 ip_addr2su(&tcp_source_ipv6_addr, ip, 0);
00314 tcp_source_ipv6=&tcp_source_ipv6_addr;
00315 break;
00316 #endif
00317 default:
00318 return -1;
00319 }
00320 return 0;
00321 }
00322
00323
00324
00325 static inline int init_sock_keepalive(int s)
00326 {
00327 int optval;
00328
00329 #ifdef HAVE_SO_KEEPALIVE
00330 if (cfg_get(tcp, tcp_cfg, keepalive)){
00331 optval=1;
00332 if (setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &optval,
00333 sizeof(optval))<0){
00334 LOG(L_WARN, "WARNING: init_sock_keepalive: failed to enable"
00335 " SO_KEEPALIVE: %s\n", strerror(errno));
00336 return -1;
00337 }
00338 }
00339 #endif
00340 #ifdef HAVE_TCP_KEEPINTVL
00341 if ((optval=cfg_get(tcp, tcp_cfg, keepintvl))){
00342 if (setsockopt(s, IPPROTO_TCP, TCP_KEEPINTVL, &optval,
00343 sizeof(optval))<0){
00344 LOG(L_WARN, "WARNING: init_sock_keepalive: failed to set"
00345 " keepalive probes interval: %s\n", strerror(errno));
00346 }
00347 }
00348 #endif
00349 #ifdef HAVE_TCP_KEEPIDLE
00350 if ((optval=cfg_get(tcp, tcp_cfg, keepidle))){
00351 if (setsockopt(s, IPPROTO_TCP, TCP_KEEPIDLE, &optval,
00352 sizeof(optval))<0){
00353 LOG(L_WARN, "WARNING: init_sock_keepalive: failed to set"
00354 " keepalive idle interval: %s\n", strerror(errno));
00355 }
00356 }
00357 #endif
00358 #ifdef HAVE_TCP_KEEPCNT
00359 if ((optval=cfg_get(tcp, tcp_cfg, keepcnt))){
00360 if (setsockopt(s, IPPROTO_TCP, TCP_KEEPCNT, &optval,
00361 sizeof(optval))<0){
00362 LOG(L_WARN, "WARNING: init_sock_keepalive: failed to set"
00363 " maximum keepalive count: %s\n", strerror(errno));
00364 }
00365 }
00366 #endif
00367 return 0;
00368 }
00369
00370
00371
00372
00373
00374
00375
00376 static int init_sock_opt(int s)
00377 {
00378 int flags;
00379 int optval;
00380
00381 #ifdef DISABLE_NAGLE
00382 flags=1;
00383 if ( (tcp_proto_no!=-1) && (setsockopt(s, tcp_proto_no , TCP_NODELAY,
00384 &flags, sizeof(flags))<0) ){
00385 LOG(L_WARN, "WARNING: init_sock_opt: could not disable Nagle: %s\n",
00386 strerror(errno));
00387 }
00388 #endif
00389
00390 optval = tos;
00391 if (setsockopt(s, IPPROTO_IP, IP_TOS, (void*)&optval,sizeof(optval)) ==-1){
00392 LOG(L_WARN, "WARNING: init_sock_opt: setsockopt tos: %s\n",
00393 strerror(errno));
00394
00395 }
00396 #if !defined(TCP_DONT_REUSEADDR)
00397 optval=1;
00398 if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
00399 (void*)&optval, sizeof(optval))==-1){
00400 LOG(L_ERR, "ERROR: setsockopt SO_REUSEADDR %s\n",
00401 strerror(errno));
00402
00403 }
00404 #endif
00405 #ifdef HAVE_TCP_SYNCNT
00406 if ((optval=cfg_get(tcp, tcp_cfg, syncnt))){
00407 if (setsockopt(s, IPPROTO_TCP, TCP_SYNCNT, &optval,
00408 sizeof(optval))<0){
00409 LOG(L_WARN, "WARNING: init_sock_opt: failed to set"
00410 " maximum SYN retr. count: %s\n", strerror(errno));
00411 }
00412 }
00413 #endif
00414 #ifdef HAVE_TCP_LINGER2
00415 if ((optval=cfg_get(tcp, tcp_cfg, linger2))){
00416 if (setsockopt(s, IPPROTO_TCP, TCP_LINGER2, &optval,
00417 sizeof(optval))<0){
00418 LOG(L_WARN, "WARNING: init_sock_opt: failed to set"
00419 " maximum LINGER2 timeout: %s\n", strerror(errno));
00420 }
00421 }
00422 #endif
00423 #ifdef HAVE_TCP_QUICKACK
00424 if (cfg_get(tcp, tcp_cfg, delayed_ack)){
00425 optval=0;
00426 if (setsockopt(s, IPPROTO_TCP, TCP_QUICKACK, &optval,
00427 sizeof(optval))<0){
00428 LOG(L_WARN, "WARNING: init_sock_opt: failed to reset"
00429 " TCP_QUICKACK: %s\n", strerror(errno));
00430 }
00431 }
00432 #endif
00433 init_sock_keepalive(s);
00434
00435
00436 flags=fcntl(s, F_GETFL);
00437 if (flags==-1){
00438 LOG(L_ERR, "ERROR: init_sock_opt: fnctl failed: (%d) %s\n",
00439 errno, strerror(errno));
00440 goto error;
00441 }
00442 if (fcntl(s, F_SETFL, flags|O_NONBLOCK)==-1){
00443 LOG(L_ERR, "ERROR: init_sock_opt: fcntl: set non-blocking failed:"
00444 " (%d) %s\n", errno, strerror(errno));
00445 goto error;
00446 }
00447 return 0;
00448 error:
00449 return -1;
00450 }
00451
00452
00453
00454
00455
00456
00457
00458
00459
00460
00461
00462
00463
00464
00465
00466 static int init_sock_opt_accept(int s)
00467 {
00468 int flags;
00469
00470
00471 flags=fcntl(s, F_GETFL);
00472 if (flags==-1){
00473 LOG(L_ERR, "ERROR: init_sock_opt_accept: fnctl failed: (%d) %s\n",
00474 errno, strerror(errno));
00475 goto error;
00476 }
00477 if (fcntl(s, F_SETFL, flags|O_NONBLOCK)==-1){
00478 LOG(L_ERR, "ERROR: init_sock_opt_accept: "
00479 "fcntl: set non-blocking failed: (%d) %s\n",
00480 errno, strerror(errno));
00481 goto error;
00482 }
00483 return 0;
00484 error:
00485 return -1;
00486 }
00487
00488
00489
00499 static int tcp_safe_close(int s)
00500 {
00501 int ret;
00502 retry:
00503 if (unlikely((ret = close(s)) < 0 )) {
00504 switch(errno) {
00505 case EINTR:
00506 goto retry;
00507 case EPIPE:
00508 case ENOTCONN:
00509 case ECONNRESET:
00510 case ECONNREFUSED:
00511 case ENETUNREACH:
00512 case EHOSTUNREACH:
00513
00514
00515 ret = 0;
00516 break;
00517 default:
00518 break;
00519 }
00520 }
00521 return ret;
00522 }
00523
00524
00525
00526
00527
00528
00529
00530
00531 static int tcp_blocking_connect(int fd, int type, snd_flags_t* send_flags,
00532 const struct sockaddr *servaddr,
00533 socklen_t addrlen)
00534 {
00535 int n;
00536 #if defined(HAVE_SELECT) && defined(BLOCKING_USE_SELECT)
00537 fd_set sel_set;
00538 fd_set orig_set;
00539 struct timeval timeout;
00540 #else
00541 struct pollfd pf;
00542 #endif
00543 int elapsed;
00544 int to;
00545 int ticks;
00546 int err;
00547 unsigned int err_len;
00548 int poll_err;
00549
00550 poll_err=0;
00551 to=cfg_get(tcp, tcp_cfg, connect_timeout_s);
00552 ticks=get_ticks();
00553 again:
00554 n=connect(fd, servaddr, addrlen);
00555 if (n==-1){
00556 if (errno==EINTR){
00557 elapsed=(get_ticks()-ticks)*TIMER_TICK;
00558 if (elapsed<to) goto again;
00559 else goto error_timeout;
00560 }
00561 if (errno!=EINPROGRESS && errno!=EALREADY){
00562 goto error_errno;
00563 }
00564 }else goto end;
00565
00566
00567 #if defined(HAVE_SELECT) && defined(BLOCKING_USE_SELECT)
00568 FD_ZERO(&orig_set);
00569 FD_SET(fd, &orig_set);
00570 #else
00571 pf.fd=fd;
00572 pf.events=POLLOUT;
00573 #endif
00574 while(1){
00575 elapsed=(get_ticks()-ticks)*TIMER_TICK;
00576 if (elapsed>=to)
00577 goto error_timeout;
00578 #if defined(HAVE_SELECT) && defined(BLOCKING_USE_SELECT)
00579 sel_set=orig_set;
00580 timeout.tv_sec=to-elapsed;
00581 timeout.tv_usec=0;
00582 n=select(fd+1, 0, &sel_set, 0, &timeout);
00583 #else
00584 n=poll(&pf, 1, (to-elapsed)*1000);
00585 #endif
00586 if (n<0){
00587 if (errno==EINTR) continue;
00588 LOG(L_ERR, "ERROR: tcp_blocking_connect %s: poll/select failed:"
00589 " (%d) %s\n",
00590 su2a((union sockaddr_union*)servaddr, addrlen),
00591 errno, strerror(errno));
00592 goto error;
00593 }else if (n==0) continue;
00594 #if defined(HAVE_SELECT) && defined(BLOCKING_USE_SELECT)
00595 if (FD_ISSET(fd, &sel_set))
00596 #else
00597 if (pf.revents&(POLLERR|POLLHUP|POLLNVAL)){
00598 LOG(L_ERR, "ERROR: tcp_blocking_connect %s: poll error: "
00599 "flags %x\n",
00600 su2a((union sockaddr_union*)servaddr, addrlen),
00601 pf.revents);
00602 poll_err=1;
00603 }
00604 #endif
00605 {
00606 err_len=sizeof(err);
00607 getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &err_len);
00608 if ((err==0) && (poll_err==0)) goto end;
00609 if (err!=EINPROGRESS && err!=EALREADY){
00610 LOG(L_ERR, "ERROR: tcp_blocking_connect %s: SO_ERROR (%d) "
00611 "%s\n",
00612 su2a((union sockaddr_union*)servaddr, addrlen),
00613 err, strerror(err));
00614 errno=err;
00615 goto error_errno;
00616 }
00617 }
00618 }
00619 error_errno:
00620 switch(errno){
00621 case ENETUNREACH:
00622 case EHOSTUNREACH:
00623 #ifdef USE_DST_BLACKLIST
00624 dst_blacklist_su(BLST_ERR_CONNECT, type,
00625 (union sockaddr_union*)servaddr, send_flags, 0);
00626 #endif
00627 TCP_EV_CONNECT_UNREACHABLE(errno, 0, 0,
00628 (union sockaddr_union*)servaddr, type);
00629 break;
00630 case ETIMEDOUT:
00631 #ifdef USE_DST_BLACKLIST
00632 dst_blacklist_su(BLST_ERR_CONNECT, type,
00633 (union sockaddr_union*)servaddr, send_flags, 0);
00634 #endif
00635 TCP_EV_CONNECT_TIMEOUT(errno, 0, 0,
00636 (union sockaddr_union*)servaddr, type);
00637 break;
00638 case ECONNREFUSED:
00639 case ECONNRESET:
00640 #ifdef USE_DST_BLACKLIST
00641 dst_blacklist_su(BLST_ERR_CONNECT, type,
00642 (union sockaddr_union*)servaddr, send_flags, 0);
00643 #endif
00644 TCP_EV_CONNECT_RST(errno, 0, 0,
00645 (union sockaddr_union*)servaddr, type);
00646 break;
00647 case EAGAIN:
00648 TCP_EV_CONNECT_NO_MORE_PORTS(errno, 0, 0,
00649 (union sockaddr_union*)servaddr, type);
00650 break;
00651 default:
00652 TCP_EV_CONNECT_ERR(errno, 0, 0,
00653 (union sockaddr_union*)servaddr, type);
00654 }
00655 LOG(L_ERR, "ERROR: tcp_blocking_connect %s: (%d) %s\n",
00656 su2a((union sockaddr_union*)servaddr, addrlen),
00657 errno, strerror(errno));
00658 goto error;
00659 error_timeout:
00660
00661 #ifdef USE_DST_BLACKLIST
00662 dst_blacklist_su(BLST_ERR_CONNECT, type,
00663 (union sockaddr_union*)servaddr, send_flags, 0);
00664 #endif
00665 TCP_EV_CONNECT_TIMEOUT(0, 0, 0, (union sockaddr_union*)servaddr, type);
00666 LOG(L_ERR, "ERROR: tcp_blocking_connect %s: timeout %d s elapsed "
00667 "from %d s\n", su2a((union sockaddr_union*)servaddr, addrlen),
00668 elapsed, cfg_get(tcp, tcp_cfg, connect_timeout_s));
00669 error:
00670 TCP_STATS_CONNECT_FAILED();
00671 return -1;
00672 end:
00673 return 0;
00674 }
00675
00676
00677
00678 #ifdef TCP_ASYNC
00679
00680
00681
00682 #define _wbufq_empty(con) ((con)->wbuf_q.first==0)
00683
00684 #define _wbufq_non_empty(con) ((con)->wbuf_q.first!=0)
00685
00686
00687
00688 inline static int _wbufq_add(struct tcp_connection* c, const char* data,
00689 unsigned int size)
00690 {
00691 struct tcp_wbuffer_queue* q;
00692 struct tcp_wbuffer* wb;
00693 unsigned int last_free;
00694 unsigned int wb_size;
00695 unsigned int crt_size;
00696 ticks_t t;
00697
00698 q=&c->wbuf_q;
00699 t=get_ticks_raw();
00700 if (unlikely( ((q->queued+size)>cfg_get(tcp, tcp_cfg, tcpconn_wq_max)) ||
00701 ((*tcp_total_wq+size)>cfg_get(tcp, tcp_cfg, tcp_wq_max)) ||
00702 (q->first &&
00703 TICKS_LT(q->wr_timeout, t)) )){
00704 LOG(L_ERR, "ERROR: wbufq_add(%d bytes): write queue full or timeout "
00705 " (%d, total %d, last write %d s ago)\n",
00706 size, q->queued, *tcp_total_wq,
00707 TICKS_TO_S(t-(q->wr_timeout-
00708 cfg_get(tcp, tcp_cfg, send_timeout))));
00709 if (q->first && TICKS_LT(q->wr_timeout, t)){
00710 if (unlikely(c->state==S_CONN_CONNECT)){
00711 #ifdef USE_DST_BLACKLIST
00712 dst_blacklist_su( BLST_ERR_CONNECT, c->rcv.proto,
00713 &c->rcv.src_su, &c->send_flags, 0);
00714 #endif
00715 TCP_EV_CONNECT_TIMEOUT(0, TCP_LADDR(c), TCP_LPORT(c),
00716 TCP_PSU(c), TCP_PROTO(c));
00717 TCP_STATS_CONNECT_FAILED();
00718 }else{
00719 #ifdef USE_DST_BLACKLIST
00720 dst_blacklist_su( BLST_ERR_SEND, c->rcv.proto,
00721 &c->rcv.src_su, &c->send_flags, 0);
00722 #endif
00723 TCP_EV_SEND_TIMEOUT(0, &c->rcv);
00724 TCP_STATS_SEND_TIMEOUT();
00725 }
00726 }else{
00727
00728 TCP_EV_SENDQ_FULL(0, &c->rcv);
00729 TCP_STATS_SENDQ_FULL();
00730 }
00731 goto error;
00732 }
00733
00734 if (unlikely(q->last==0)){
00735 wb_size=MAX_unsigned(cfg_get(tcp, tcp_cfg, wq_blk_size), size);
00736 wb=shm_malloc(sizeof(*wb)+wb_size-1);
00737 if (unlikely(wb==0))
00738 goto error;
00739 wb->b_size=wb_size;
00740 wb->next=0;
00741 q->last=wb;
00742 q->first=wb;
00743 q->last_used=0;
00744 q->offset=0;
00745 q->wr_timeout=get_ticks_raw()+
00746 ((c->state==S_CONN_CONNECT)?
00747 S_TO_TICKS(cfg_get(tcp, tcp_cfg, connect_timeout_s)):
00748 cfg_get(tcp, tcp_cfg, send_timeout));
00749 }else{
00750 wb=q->last;
00751 }
00752
00753 while(size){
00754 last_free=wb->b_size-q->last_used;
00755 if (last_free==0){
00756 wb_size=MAX_unsigned(cfg_get(tcp, tcp_cfg, wq_blk_size), size);
00757 wb=shm_malloc(sizeof(*wb)+wb_size-1);
00758 if (unlikely(wb==0))
00759 goto error;
00760 wb->b_size=wb_size;
00761 wb->next=0;
00762 q->last->next=wb;
00763 q->last=wb;
00764 q->last_used=0;
00765 last_free=wb->b_size;
00766 }
00767 crt_size=MIN_unsigned(last_free, size);
00768 memcpy(wb->buf+q->last_used, data, crt_size);
00769 q->last_used+=crt_size;
00770 size-=crt_size;
00771 data+=crt_size;
00772 q->queued+=crt_size;
00773 atomic_add_int((int*)tcp_total_wq, crt_size);
00774 }
00775 return 0;
00776 error:
00777 return -1;
00778 }
00779
00780
00781
00782
00783
00784
00785
00786 inline static int _wbufq_insert(struct tcp_connection* c, const char* data,
00787 unsigned int size)
00788 {
00789 struct tcp_wbuffer_queue* q;
00790 struct tcp_wbuffer* wb;
00791
00792 q=&c->wbuf_q;
00793 if (likely(q->first==0))
00794 return _wbufq_add(c, data, size);
00795
00796 if (unlikely((*tcp_total_wq+size)>cfg_get(tcp, tcp_cfg, tcp_wq_max))){
00797 LOG(L_ERR, "ERROR: wbufq_insert(%d bytes): write queue full"
00798 " (%d, total %d, last write %d s ago)\n",
00799 size, q->queued, *tcp_total_wq,
00800 TICKS_TO_S(get_ticks_raw()-q->wr_timeout-
00801 cfg_get(tcp, tcp_cfg, send_timeout)));
00802 goto error;
00803 }
00804 if (unlikely(q->offset)){
00805 LOG(L_CRIT, "BUG: wbufq_insert: non-null offset %d (bad call, should"
00806 "never be called after the wbufq_run())\n", q->offset);
00807 goto error;
00808 }
00809 if ((q->first==q->last) && ((q->last->b_size-q->last_used)>=size)){
00810
00811 memmove(q->first->buf+size, q->first->buf, size);
00812 memcpy(q->first->buf, data, size);
00813 q->last_used+=size;
00814 }else{
00815
00816 wb=shm_malloc(sizeof(*wb)+size-1);
00817 if (unlikely(wb==0))
00818 goto error;
00819 wb->b_size=size;
00820
00821 wb->next=q->first;
00822 q->first=wb;
00823 memcpy(wb->buf, data, size);
00824 }
00825
00826 q->queued+=size;
00827 atomic_add_int((int*)tcp_total_wq, size);
00828 return 0;
00829 error:
00830 return -1;
00831 }
00832
00833
00834
00835
00836 inline static void _wbufq_destroy( struct tcp_wbuffer_queue* q)
00837 {
00838 struct tcp_wbuffer* wb;
00839 struct tcp_wbuffer* next_wb;
00840 int unqueued;
00841
00842 unqueued=0;
00843 if (likely(q->first)){
00844 wb=q->first;
00845 do{
00846 next_wb=wb->next;
00847 unqueued+=(wb==q->last)?q->last_used:wb->b_size;
00848 if (wb==q->first)
00849 unqueued-=q->offset;
00850 shm_free(wb);
00851 wb=next_wb;
00852 }while(wb);
00853 }
00854 memset(q, 0, sizeof(*q));
00855 atomic_add_int((int*)tcp_total_wq, -unqueued);
00856 }
00857
00858
00859
00860
00861
00862
00863 inline static int wbufq_run(int fd, struct tcp_connection* c, int* empty)
00864 {
00865 struct tcp_wbuffer_queue* q;
00866 struct tcp_wbuffer* wb;
00867 int n;
00868 int ret;
00869 int block_size;
00870 char* buf;
00871
00872 *empty=0;
00873 ret=0;
00874 lock_get(&c->write_lock);
00875 q=&c->wbuf_q;
00876 while(q->first){
00877 block_size=((q->first==q->last)?q->last_used:q->first->b_size)-
00878 q->offset;
00879 buf=q->first->buf+q->offset;
00880 n=_tcpconn_write_nb(fd, c, buf, block_size);
00881 if (likely(n>0)){
00882 ret+=n;
00883 if (likely(n==block_size)){
00884 wb=q->first;
00885 q->first=q->first->next;
00886 shm_free(wb);
00887 q->offset=0;
00888 q->queued-=block_size;
00889 atomic_add_int((int*)tcp_total_wq, -block_size);
00890 }else{
00891 q->offset+=n;
00892 q->queued-=n;
00893 atomic_add_int((int*)tcp_total_wq, -n);
00894 break;
00895 }
00896 }else{
00897 if (n<0){
00898
00899 if (!(errno==EAGAIN || errno==EWOULDBLOCK)){
00900 if (unlikely(c->state==S_CONN_CONNECT)){
00901 switch(errno){
00902 case ENETUNREACH:
00903 case EHOSTUNREACH:
00904 #ifdef USE_DST_BLACKLIST
00905 dst_blacklist_su(BLST_ERR_CONNECT,
00906 c->rcv.proto,
00907 &c->rcv.src_su,
00908 &c->send_flags, 0);
00909 #endif
00910 TCP_EV_CONNECT_UNREACHABLE(errno, TCP_LADDR(c),
00911 TCP_LPORT(c), TCP_PSU(c),
00912 TCP_PROTO(c));
00913 break;
00914 case ECONNREFUSED:
00915 case ECONNRESET:
00916 #ifdef USE_DST_BLACKLIST
00917 dst_blacklist_su(BLST_ERR_CONNECT,
00918 c->rcv.proto,
00919 &c->rcv.src_su,
00920 &c->send_flags, 0);
00921 #endif
00922 TCP_EV_CONNECT_RST(0, TCP_LADDR(c),
00923 TCP_LPORT(c), TCP_PSU(c),
00924 TCP_PROTO(c));
00925 break;
00926 default:
00927 TCP_EV_CONNECT_ERR(errno, TCP_LADDR(c),
00928 TCP_LPORT(c), TCP_PSU(c),
00929 TCP_PROTO(c));
00930 }
00931 TCP_STATS_CONNECT_FAILED();
00932 }else{
00933 switch(errno){
00934 case ECONNREFUSED:
00935 case ECONNRESET:
00936 TCP_STATS_CON_RESET();
00937
00938 case ENETUNREACH:
00939 case EHOSTUNREACH:
00940 #ifdef USE_DST_BLACKLIST
00941 dst_blacklist_su(BLST_ERR_SEND,
00942 c->rcv.proto,
00943 &c->rcv.src_su,
00944 &c->send_flags, 0);
00945 #endif
00946 break;
00947 }
00948 }
00949 ret=-1;
00950 LOG(L_ERR, "ERROR: wbuf_runq: %s [%d]\n",
00951 strerror(errno), errno);
00952 }
00953 }
00954 break;
00955 }
00956 }
00957 if (likely(q->first==0)){
00958 q->last=0;
00959 q->last_used=0;
00960 q->offset=0;
00961 *empty=1;
00962 }
00963 lock_release(&c->write_lock);
00964 if (likely(ret>0)){
00965 q->wr_timeout=get_ticks_raw()+cfg_get(tcp, tcp_cfg, send_timeout);
00966 if (unlikely(c->state==S_CONN_CONNECT || c->state==S_CONN_ACCEPT)){
00967 TCP_STATS_ESTABLISHED(c->state);
00968 c->state=S_CONN_OK;
00969 }
00970 }
00971 return ret;
00972 }
00973
00974 #endif
00975
00976
00977
00978 #if 0
00979
00980
00981 static int tcp_blocking_write(struct tcp_connection* c, int fd, char* buf,
00982 unsigned int len)
00983 {
00984 int n;
00985 fd_set sel_set;
00986 struct timeval timeout;
00987 int ticks;
00988 int initial_len;
00989
00990 initial_len=len;
00991 again:
00992
00993 n=send(fd, buf, len,
00994 #ifdef HAVE_MSG_NOSIGNAL
00995 MSG_NOSIGNAL
00996 #else
00997 0
00998 #endif
00999 );
01000 if (n<0){
01001 if (errno==EINTR) goto again;
01002 else if (errno!=EAGAIN && errno!=EWOULDBLOCK){
01003 LOG(L_ERR, "tcp_blocking_write: failed to send: (%d) %s\n",
01004 errno, strerror(errno));
01005 TCP_EV_SEND_TIMEOUT(errno, &c->rcv);
01006 TCP_STATS_SEND_TIMEOUT();
01007 goto error;
01008 }
01009 }else if (n<len){
01010
01011 buf+=n;
01012 len-=n;
01013 }else{
01014
01015 goto end;
01016 }
01017 while(1){
01018 FD_ZERO(&sel_set);
01019 FD_SET(fd, &sel_set);
01020 timeout.tv_sec=tcp_send_timeout;
01021 timeout.tv_usec=0;
01022 ticks=get_ticks();
01023 n=select(fd+1, 0, &sel_set, 0, &timeout);
01024 if (n<0){
01025 if (errno==EINTR) continue;
01026 LOG(L_ERR, "ERROR: tcp_blocking_write: select failed: "
01027 " (%d) %s\n", errno, strerror(errno));
01028 goto error;
01029 }else if (n==0){
01030
01031 if (get_ticks()-ticks>=tcp_send_timeout){
01032 LOG(L_ERR, "ERROR: tcp_blocking_write: send timeout (%d)\n",
01033 tcp_send_timeout);
01034 goto error;
01035 }
01036 continue;
01037 }
01038 if (FD_ISSET(fd, &sel_set)){
01039
01040 goto again;
01041 }
01042 }
01043 error:
01044 return -1;
01045 end:
01046 return initial_len;
01047 }
01048 #endif
01049
01050
01051
01052 struct tcp_connection* tcpconn_new(int sock, union sockaddr_union* su,
01053 union sockaddr_union* local_addr,
01054 struct socket_info* ba, int type,
01055 int state)
01056 {
01057 struct tcp_connection *c;
01058 int rd_b_size;
01059
01060 rd_b_size=cfg_get(tcp, tcp_cfg, rd_buf_size);
01061 c=shm_malloc(sizeof(struct tcp_connection) + rd_b_size);
01062 if (c==0){
01063 LOG(L_ERR, "ERROR: tcpconn_new: mem. allocation failure\n");
01064 goto error;
01065 }
01066 memset(c, 0, sizeof(struct tcp_connection));
01067 c->s=sock;
01068 c->fd=-1;
01069 if (lock_init(&c->write_lock)==0){
01070 LOG(L_ERR, "ERROR: tcpconn_new: init lock failed\n");
01071 goto error;
01072 }
01073
01074 c->rcv.src_su=*su;
01075
01076 atomic_set(&c->refcnt, 0);
01077 local_timer_init(&c->timer, tcpconn_main_timeout, c, 0);
01078 su2ip_addr(&c->rcv.src_ip, su);
01079 c->rcv.src_port=su_getport(su);
01080 c->rcv.bind_address=ba;
01081 if (likely(local_addr)){
01082 su2ip_addr(&c->rcv.dst_ip, local_addr);
01083 c->rcv.dst_port=su_getport(local_addr);
01084 }else if (ba){
01085 c->rcv.dst_ip=ba->address;
01086 c->rcv.dst_port=ba->port_no;
01087 }
01088 print_ip("tcpconn_new: new tcp connection: ", &c->rcv.src_ip, "\n");
01089 DBG( "tcpconn_new: on port %d, type %d\n", c->rcv.src_port, type);
01090 init_tcp_req(&c->req, (char*)c+sizeof(struct tcp_connection), rd_b_size);
01091 c->id=(*connection_id)++;
01092 c->rcv.proto_reserved1=0;
01093 c->rcv.proto_reserved2=0;
01094 c->state=state;
01095 c->extra_data=0;
01096 #ifdef USE_TLS
01097 if (type==PROTO_TLS){
01098 if (tls_tcpconn_init(c, sock)==-1) goto error;
01099 }else
01100 #endif
01101 {
01102 c->type=PROTO_TCP;
01103 c->rcv.proto=PROTO_TCP;
01104 c->timeout=get_ticks_raw()+cfg_get(tcp, tcp_cfg, con_lifetime);
01105 }
01106
01107 return c;
01108
01109 error:
01110 if (c) shm_free(c);
01111 return 0;
01112 }
01113
01114
01115
01116
01117
01118
01119
01120 inline static int tcp_do_connect( union sockaddr_union* server,
01121 union sockaddr_union* from,
01122 int type,
01123 snd_flags_t* send_flags,
01124 union sockaddr_union* res_local_addr,
01125 struct socket_info** res_si,
01126 enum tcp_conn_states *state
01127 )
01128 {
01129 int s;
01130 union sockaddr_union my_name;
01131 socklen_t my_name_len;
01132 struct ip_addr ip;
01133 #ifdef TCP_ASYNC
01134 int n;
01135 #endif
01136
01137 s=socket(AF2PF(server->s.sa_family), SOCK_STREAM, 0);
01138 if (unlikely(s==-1)){
01139 LOG(L_ERR, "ERROR: tcp_do_connect %s: socket: (%d) %s\n",
01140 su2a(server, sizeof(*server)), errno, strerror(errno));
01141 goto error;
01142 }
01143 if (init_sock_opt(s)<0){
01144 LOG(L_ERR, "ERROR: tcp_do_connect %s: init_sock_opt failed\n",
01145 su2a(server, sizeof(*server)));
01146 goto error;
01147 }
01148
01149 if (unlikely(from && bind(s, &from->s, sockaddru_len(*from)) != 0)){
01150 LOG(L_WARN, "WARNING: tcp_do_connect: binding to source address"
01151 " %s failed: %s [%d]\n", su2a(from, sizeof(*from)),
01152 strerror(errno), errno);
01153 }
01154 *state=S_CONN_OK;
01155 #ifdef TCP_ASYNC
01156 if (likely(cfg_get(tcp, tcp_cfg, async))){
01157 again:
01158 n=connect(s, &server->s, sockaddru_len(*server));
01159 if (likely(n==-1)){
01160 if (likely(errno==EINPROGRESS))
01161 *state=S_CONN_CONNECT;
01162 else if (errno==EINTR) goto again;
01163 else if (errno!=EALREADY){
01164 switch(errno){
01165 case ENETUNREACH:
01166 case EHOSTUNREACH:
01167 #ifdef USE_DST_BLACKLIST
01168 dst_blacklist_su(BLST_ERR_CONNECT, type, server,
01169 send_flags, 0);
01170 #endif
01171 TCP_EV_CONNECT_UNREACHABLE(errno, 0, 0, server, type);
01172 break;
01173 case ETIMEDOUT:
01174 #ifdef USE_DST_BLACKLIST
01175 dst_blacklist_su(BLST_ERR_CONNECT, type, server,
01176 send_flags, 0);
01177 #endif
01178 TCP_EV_CONNECT_TIMEOUT(errno, 0, 0, server, type);
01179 break;
01180 case ECONNREFUSED:
01181 case ECONNRESET:
01182 #ifdef USE_DST_BLACKLIST
01183 dst_blacklist_su(BLST_ERR_CONNECT, type, server,
01184 send_flags, 0);
01185 #endif
01186 TCP_EV_CONNECT_RST(errno, 0, 0, server, type);
01187 break;
01188 case EAGAIN:
01189 TCP_EV_CONNECT_NO_MORE_PORTS(errno, 0, 0, server,type);
01190 break;
01191 default:
01192 TCP_EV_CONNECT_ERR(errno, 0, 0, server, type);
01193 }
01194 TCP_STATS_CONNECT_FAILED();
01195 LOG(L_ERR, "ERROR: tcp_do_connect: connect %s: (%d) %s\n",
01196 su2a(server, sizeof(*server)),
01197 errno, strerror(errno));
01198 goto error;
01199 }
01200 }
01201 }else{
01202 #endif
01203 if (tcp_blocking_connect(s, type, send_flags, &server->s,
01204 sockaddru_len(*server))<0){
01205 LOG(L_ERR, "ERROR: tcp_do_connect: tcp_blocking_connect %s"
01206 " failed\n", su2a(server, sizeof(*server)));
01207 goto error;
01208 }
01209 #ifdef TCP_ASYNC
01210 }
01211 #endif
01212 if (from){
01213 su2ip_addr(&ip, from);
01214 if (!ip_addr_any(&ip))
01215
01216 goto find_socket;
01217 }
01218 my_name_len=sizeof(my_name);
01219 if (unlikely(getsockname(s, &my_name.s, &my_name_len)!=0)){
01220 LOG(L_ERR, "ERROR: tcp_do_connect: getsockname failed: %s(%d)\n",
01221 strerror(errno), errno);
01222 *res_si=0;
01223 goto error;
01224 }
01225 from=&my_name;
01226 su2ip_addr(&ip, &my_name);
01227 find_socket:
01228 #ifdef USE_TLS
01229 if (unlikely(type==PROTO_TLS))
01230 *res_si=find_si(&ip, 0, PROTO_TLS);
01231 else
01232 #endif
01233 *res_si=find_si(&ip, 0, PROTO_TCP);
01234
01235 if (unlikely(*res_si==0)){
01236 LOG(L_WARN, "WARNING: tcp_do_connect %s: could not find corresponding"
01237 " listening socket for %s, using default...\n",
01238 su2a(server, sizeof(*server)), ip_addr2a(&ip));
01239 if (server->s.sa_family==AF_INET) *res_si=sendipv4_tcp;
01240 #ifdef USE_IPV6
01241 else *res_si=sendipv6_tcp;
01242 #endif
01243 }
01244 *res_local_addr=*from;
01245 return s;
01246 error:
01247 if (s!=-1) tcp_safe_close(s);
01248 return -1;
01249 }
01250
01251
01252
01253 struct tcp_connection* tcpconn_connect( union sockaddr_union* server,
01254 union sockaddr_union* from,
01255 int type, snd_flags_t* send_flags)
01256 {
01257 int s;
01258 struct socket_info* si;
01259 union sockaddr_union my_name;
01260 struct tcp_connection* con;
01261 enum tcp_conn_states state;
01262
01263 s=-1;
01264
01265 if (*tcp_connections_no >= cfg_get(tcp, tcp_cfg, max_connections)){
01266 LOG(L_ERR, "ERROR: tcpconn_connect: maximum number of connections"
01267 " exceeded (%d/%d)\n",
01268 *tcp_connections_no,
01269 cfg_get(tcp, tcp_cfg, max_connections));
01270 goto error;
01271 }
01272 if (unlikely(type==PROTO_TLS)) {
01273 if (*tls_connections_no >= cfg_get(tcp, tcp_cfg, max_tls_connections)){
01274 LM_ERR("ERROR: maximum number of tls connections"
01275 " exceeded (%d/%d)\n",
01276 *tls_connections_no,
01277 cfg_get(tcp, tcp_cfg, max_tls_connections));
01278 goto error;
01279 }
01280 }
01281
01282 s=tcp_do_connect(server, from, type, send_flags, &my_name, &si, &state);
01283 if (s==-1){
01284 LOG(L_ERR, "ERROR: tcp_do_connect %s: failed (%d) %s\n",
01285 su2a(server, sizeof(*server)), errno, strerror(errno));
01286 goto error;
01287 }
01288 con=tcpconn_new(s, server, &my_name, si, type, state);
01289 if (con==0){
01290 LOG(L_ERR, "ERROR: tcp_connect %s: tcpconn_new failed, closing the "
01291 " socket\n", su2a(server, sizeof(*server)));
01292 goto error;
01293 }
01294 tcpconn_set_send_flags(con, *send_flags);
01295 return con;
01296 error:
01297 if (s!=-1) tcp_safe_close(s);
01298 return 0;
01299 }
01300
01301
01302
01303 #ifdef TCP_CONNECT_WAIT
01304 int tcpconn_finish_connect( struct tcp_connection* c,
01305 union sockaddr_union* from)
01306 {
01307 int s;
01308 int r;
01309 union sockaddr_union local_addr;
01310 struct socket_info* si;
01311 enum tcp_conn_states state;
01312 struct tcp_conn_alias* a;
01313 int new_conn_alias_flags;
01314
01315 s=tcp_do_connect(&c->rcv.src_su, from, c->type, &c->send_flags,
01316 &local_addr, &si, &state);
01317 if (unlikely(s==-1)){
01318 LOG(L_ERR, "ERROR: tcpconn_finish_connect %s: tcp_do_connect for %p"
01319 " failed\n", su2a(&c->rcv.src_su, sizeof(c->rcv.src_su)),
01320 c);
01321 return -1;
01322 }
01323 c->rcv.bind_address=si;
01324 su2ip_addr(&c->rcv.dst_ip, &local_addr);
01325 c->rcv.dst_port=su_getport(&local_addr);
01326
01327 if (likely(from==0)){
01328 new_conn_alias_flags=cfg_get(tcp, tcp_cfg, new_conn_alias_flags);
01329
01330 TCPCONN_LOCK;
01331 _tcpconn_add_alias_unsafe(c, c->rcv.src_port, &c->rcv.dst_ip, 0,
01332 new_conn_alias_flags);
01333 _tcpconn_add_alias_unsafe(c, c->rcv.src_port, &c->rcv.dst_ip,
01334 c->rcv.dst_port, new_conn_alias_flags);
01335 TCPCONN_UNLOCK;
01336 }else if (su_cmp(from, &local_addr)!=1){
01337 new_conn_alias_flags=cfg_get(tcp, tcp_cfg, new_conn_alias_flags);
01338 TCPCONN_LOCK;
01339
01340
01341
01342 for (r=1; r<c->aliases; r++){
01343 a=&c->con_aliases[r];
01344 tcpconn_listrm(tcpconn_aliases_hash[a->hash], a, next, prev);
01345 }
01346 c->aliases=1;
01347
01348 _tcpconn_add_alias_unsafe(c, c->rcv.src_port, &c->rcv.dst_ip,
01349 0, new_conn_alias_flags);
01350 _tcpconn_add_alias_unsafe(c, c->rcv.src_port, &c->rcv.dst_ip,
01351 c->rcv.dst_port, new_conn_alias_flags);
01352 TCPCONN_UNLOCK;
01353 }
01354
01355 return s;
01356 }
01357 #endif
01358
01359
01360
01361
01362
01363 inline static struct tcp_connection* tcpconn_add(struct tcp_connection *c)
01364 {
01365 struct ip_addr zero_ip;
01366 int new_conn_alias_flags;
01367
01368 if (likely(c)){
01369 ip_addr_mk_any(c->rcv.src_ip.af, &zero_ip);
01370 c->id_hash=tcp_id_hash(c->id);
01371 c->aliases=0;
01372 new_conn_alias_flags=cfg_get(tcp, tcp_cfg, new_conn_alias_flags);
01373 TCPCONN_LOCK;
01374 c->flags|=F_CONN_HASHED;
01375
01376 tcpconn_listadd(tcpconn_id_hash[c->id_hash], c, id_next, id_prev);
01377
01378
01379
01380
01381
01382
01383
01384 _tcpconn_add_alias_unsafe(c, c->rcv.src_port, &zero_ip, 0,
01385 new_conn_alias_flags);
01386 if (likely(c->rcv.dst_ip.af && ! ip_addr_any(&c->rcv.dst_ip))){
01387 _tcpconn_add_alias_unsafe(c, c->rcv.src_port, &c->rcv.dst_ip, 0,
01388 new_conn_alias_flags);
01389 _tcpconn_add_alias_unsafe(c, c->rcv.src_port, &c->rcv.dst_ip,
01390 c->rcv.dst_port, new_conn_alias_flags);
01391 }
01392
01393
01394
01395 TCPCONN_UNLOCK;
01396 DBG("tcpconn_add: hashes: %d:%d:%d, %d\n",
01397 c->con_aliases[0].hash,
01398 c->con_aliases[1].hash,
01399 c->con_aliases[2].hash,
01400 c->id_hash);
01401 return c;
01402 }else{
01403 LOG(L_CRIT, "tcpconn_add: BUG: null connection pointer\n");
01404 return 0;
01405 }
01406 }
01407
01408
01409 static inline void _tcpconn_detach(struct tcp_connection *c)
01410 {
01411 int r;
01412 tcpconn_listrm(tcpconn_id_hash[c->id_hash], c, id_next, id_prev);
01413
01414 for (r=0; r<c->aliases; r++)
01415 tcpconn_listrm(tcpconn_aliases_hash[c->con_aliases[r].hash],
01416 &c->con_aliases[r], next, prev);
01417 c->aliases = 0;
01418 }
01419
01420
01421
01422 static inline void _tcpconn_free(struct tcp_connection* c)
01423 {
01424 #ifdef TCP_ASYNC
01425 if (unlikely(_wbufq_non_empty(c)))
01426 _wbufq_destroy(&c->wbuf_q);
01427 #endif
01428 lock_destroy(&c->write_lock);
01429 #ifdef USE_TLS
01430 if (unlikely(c->type==PROTO_TLS)) tls_tcpconn_clean(c);
01431 #endif
01432 shm_free(c);
01433 }
01434
01435
01436
01437
01438 void _tcpconn_rm(struct tcp_connection* c)
01439 {
01440 _tcpconn_detach(c);
01441 _tcpconn_free(c);
01442 }
01443
01444
01445
01446 void tcpconn_rm(struct tcp_connection* c)
01447 {
01448 int r;
01449 TCPCONN_LOCK;
01450 tcpconn_listrm(tcpconn_id_hash[c->id_hash], c, id_next, id_prev);
01451
01452 for (r=0; r<c->aliases; r++)
01453 tcpconn_listrm(tcpconn_aliases_hash[c->con_aliases[r].hash],
01454 &c->con_aliases[r], next, prev);
01455 c->aliases = 0;
01456 TCPCONN_UNLOCK;
01457 lock_destroy(&c->write_lock);
01458 #ifdef USE_TLS
01459 if ((c->type==PROTO_TLS)&&(c->extra_data)) tls_tcpconn_clean(c);
01460 #endif
01461 shm_free(c);
01462 }
01463
01464
01465
01466
01467
01468
01469
01470
01471 struct tcp_connection* _tcpconn_find(int id, struct ip_addr* ip, int port,
01472 struct ip_addr* l_ip, int l_port)
01473 {
01474
01475 struct tcp_connection *c;
01476 struct tcp_conn_alias* a;
01477 unsigned hash;
01478 int is_local_ip_any;
01479
01480 #ifdef EXTRA_DEBUG
01481 DBG("tcpconn_find: %d port %d\n",id, port);
01482 if (ip) print_ip("tcpconn_find: ip ", ip, "\n");
01483 #endif
01484 if (likely(id)){
01485 hash=tcp_id_hash(id);
01486 for (c=tcpconn_id_hash[hash]; c; c=c->id_next){
01487 #ifdef EXTRA_DEBUG
01488 DBG("c=%p, c->id=%d, port=%d\n",c, c->id, c->rcv.src_port);
01489 print_ip("ip=", &c->rcv.src_ip, "\n");
01490 #endif
01491 if ((id==c->id)&&(c->state!=S_CONN_BAD)) return c;
01492 }
01493 }else if (likely(ip)){
01494 hash=tcp_addr_hash(ip, port, l_ip, l_port);
01495 is_local_ip_any=ip_addr_any(l_ip);
01496 for (a=tcpconn_aliases_hash[hash]; a; a=a->next){
01497 #ifdef EXTRA_DEBUG
01498 DBG("a=%p, c=%p, c->id=%d, alias port= %d port=%d\n", a, a->parent,
01499 a->parent->id, a->port, a->parent->rcv.src_port);
01500 print_ip("ip=",&a->parent->rcv.src_ip,"\n");
01501 #endif
01502 if ( (a->parent->state!=S_CONN_BAD) && (port==a->port) &&
01503 ((l_port==0) || (l_port==a->parent->rcv.dst_port)) &&
01504 (ip_addr_cmp(ip, &a->parent->rcv.src_ip)) &&
01505 (is_local_ip_any ||
01506 ip_addr_cmp(l_ip, &a->parent->rcv.dst_ip))
01507 )
01508 return a->parent;
01509 }
01510 }
01511 return 0;
01512 }
01513
01514
01515
01516
01517
01518
01519
01520 struct tcp_connection* tcpconn_get(int id, struct ip_addr* ip, int port,
01521 union sockaddr_union* local_addr,
01522 ticks_t timeout)
01523 {
01524 struct tcp_connection* c;
01525 struct ip_addr local_ip;
01526 int local_port;
01527
01528 local_port=0;
01529 if (likely(ip)){
01530 if (unlikely(local_addr)){
01531 su2ip_addr(&local_ip, local_addr);
01532 local_port=su_getport(local_addr);
01533 }else{
01534 ip_addr_mk_any(ip->af, &local_ip);
01535 local_port=0;
01536 }
01537 }
01538 TCPCONN_LOCK;
01539 c=_tcpconn_find(id, ip, port, &local_ip, local_port);
01540 if (likely(c)){
01541 atomic_inc(&c->refcnt);
01542
01543
01544
01545
01546 if (likely(c->reader_pid==0))
01547 c->timeout=get_ticks_raw()+timeout;
01548 }
01549 TCPCONN_UNLOCK;
01550 return c;
01551 }
01552
01553
01554
01555
01556
01557
01558
01559
01560
01561
01562 inline static int _tcpconn_add_alias_unsafe(struct tcp_connection* c, int port,
01563 struct ip_addr* l_ip, int l_port,
01564 int flags)
01565 {
01566 unsigned hash;
01567 struct tcp_conn_alias* a;
01568 struct tcp_conn_alias* nxt;
01569 struct tcp_connection* p;
01570 int is_local_ip_any;
01571 int i;
01572 int r;
01573
01574 a=0;
01575 is_local_ip_any=ip_addr_any(l_ip);
01576 if (likely(c)){
01577 hash=tcp_addr_hash(&c->rcv.src_ip, port, l_ip, l_port);
01578
01579 for (a=tcpconn_aliases_hash[hash], nxt=0; a; a=nxt){
01580 nxt=a->next;
01581 if ( (a->parent->state!=S_CONN_BAD) && (port==a->port) &&
01582 ( (l_port==0) || (l_port==a->parent->rcv.dst_port)) &&
01583 (ip_addr_cmp(&c->rcv.src_ip, &a->parent->rcv.src_ip)) &&
01584 ( is_local_ip_any ||
01585 ip_addr_cmp(&a->parent->rcv.dst_ip, l_ip))
01586 ){
01587
01588 if (unlikely(a->parent!=c)){
01589 if (flags & TCP_ALIAS_FORCE_ADD)
01590
01591
01592 continue;
01593 else if (flags & TCP_ALIAS_REPLACE){
01594
01595
01596
01597
01598
01599 p=a->parent;
01600 for (i=0; (i<p->aliases) && (&(p->con_aliases[i])!=a);
01601 i++);
01602 if (unlikely(i==p->aliases)){
01603 LOG(L_CRIT, "BUG: _tcpconn_add_alias_unsafe: "
01604 " alias %p not found in con %p (id %d)\n",
01605 a, p, p->id);
01606 goto error_not_found;
01607 }
01608 for (r=i; r<p->aliases; r++){
01609 tcpconn_listrm(
01610 tcpconn_aliases_hash[p->con_aliases[r].hash],
01611 &p->con_aliases[r], next, prev);
01612 }
01613 if (likely((i+1)<p->aliases)){
01614 memmove(&p->con_aliases[i], &p->con_aliases[i+1],
01615 (p->aliases-i-1)*
01616 sizeof(p->con_aliases[0]));
01617 }
01618 p->aliases--;
01619
01620 for (r=i; r<p->aliases; r++){
01621 tcpconn_listadd(
01622 tcpconn_aliases_hash[p->con_aliases[r].hash],
01623 &p->con_aliases[r], next, prev);
01624 }
01625 }else
01626 goto error_sec;
01627 }else goto ok;
01628 }
01629 }
01630 if (unlikely(c->aliases>=TCP_CON_MAX_ALIASES)) goto error_aliases;
01631 c->con_aliases[c->aliases].parent=c;
01632 c->con_aliases[c->aliases].port=port;
01633 c->con_aliases[c->aliases].hash=hash;
01634 tcpconn_listadd(tcpconn_aliases_hash[hash],
01635 &c->con_aliases[c->aliases], next, prev);
01636 c->aliases++;
01637 }else goto error_not_found;
01638 ok:
01639 #ifdef EXTRA_DEBUG
01640 if (a) DBG("_tcpconn_add_alias_unsafe: alias already present\n");
01641 else DBG("_tcpconn_add_alias_unsafe: alias port %d for hash %d, id %d\n",
01642 port, hash, c->id);
01643 #endif
01644 return 0;
01645 error_aliases:
01646
01647 return -2;
01648 error_not_found:
01649
01650 return -1;
01651 error_sec:
01652
01653
01654 return -3;
01655 }
01656
01657
01658
01659
01660
01661 int tcpconn_add_alias(int id, int port, int proto)
01662 {
01663 struct tcp_connection* c;
01664 int ret;
01665 struct ip_addr zero_ip;
01666 int r;
01667 int alias_flags;
01668
01669
01670 port=port?port:((proto==PROTO_TLS)?SIPS_PORT:SIP_PORT);
01671 TCPCONN_LOCK;
01672
01673 c=_tcpconn_find(id, 0, 0, 0, 0);
01674 if (likely(c)){
01675 ip_addr_mk_any(c->rcv.src_ip.af, &zero_ip);
01676 alias_flags=cfg_get(tcp, tcp_cfg, alias_flags);
01677
01678 ret=_tcpconn_add_alias_unsafe(c, port, &zero_ip, 0,
01679 alias_flags);
01680 if (ret<0 && ret!=-3) goto error;
01681
01682 ret=_tcpconn_add_alias_unsafe(c, port, &c->rcv.dst_ip, 0,
01683 alias_flags);
01684 if (ret<0 && ret!=-3) goto error;
01685
01686 ret=_tcpconn_add_alias_unsafe(c, port, &c->rcv.dst_ip, c->rcv.dst_port,
01687 alias_flags);
01688 if (unlikely(ret<0)) goto error;
01689 }else goto error_not_found;
01690 TCPCONN_UNLOCK;
01691 return 0;
01692 error_not_found:
01693 TCPCONN_UNLOCK;
01694 LOG(L_ERR, "ERROR: tcpconn_add_alias: no connection found for id %d\n",id);
01695 return -1;
01696 error:
01697 TCPCONN_UNLOCK;
01698 switch(ret){
01699 case -2:
01700 LOG(L_ERR, "ERROR: tcpconn_add_alias: too many aliases (%d)"
01701 " for connection %p (id %d) %s:%d <- %d\n",
01702 c->aliases, c, c->id, ip_addr2a(&c->rcv.src_ip),
01703 c->rcv.src_port, port);
01704 for (r=0; r<c->aliases; r++){
01705 LOG(L_ERR, "ERROR: tcpconn_add_alias: alias %d: for %p (%d)"
01706 " %s:%d <-%d hash %x\n", r, c, c->id,
01707 ip_addr2a(&c->rcv.src_ip), c->rcv.src_port,
01708 c->con_aliases[r].port, c->con_aliases[r].hash);
01709 }
01710 break;
01711 case -3:
01712 LOG(L_ERR, "ERROR: tcpconn_add_alias: possible port"
01713 " hijack attempt\n");
01714 LOG(L_ERR, "ERROR: tcpconn_add_alias: alias for %d port %d already"
01715 " present and points to another connection \n",
01716 c->id, port);
01717 break;
01718 default:
01719 LOG(L_ERR, "ERROR: tcpconn_add_alias: unknown error %d\n", ret);
01720 }
01721 return -1;
01722 }
01723
01724
01725
01726 #ifdef TCP_FD_CACHE
01727
01728 static void tcp_fd_cache_init(void)
01729 {
01730 int r;
01731 for (r=0; r<TCP_FD_CACHE_SIZE; r++)
01732 fd_cache[r].fd=-1;
01733 }
01734
01735
01736 inline static struct fd_cache_entry* tcp_fd_cache_get(struct tcp_connection *c)
01737 {
01738 int h;
01739
01740 h=c->id%TCP_FD_CACHE_SIZE;
01741 if ((fd_cache[h].fd>0) && (fd_cache[h].id==c->id) && (fd_cache[h].con==c))
01742 return &fd_cache[h];
01743 return 0;
01744 }
01745
01746
01747 inline static void tcp_fd_cache_rm(struct fd_cache_entry* e)
01748 {
01749 e->fd=-1;
01750 }
01751
01752
01753 inline static void tcp_fd_cache_add(struct tcp_connection *c, int fd)
01754 {
01755 int h;
01756
01757 h=c->id%TCP_FD_CACHE_SIZE;
01758 if (likely(fd_cache[h].fd>0))
01759 tcp_safe_close(fd_cache[h].fd);
01760 fd_cache[h].fd=fd;
01761 fd_cache[h].id=c->id;
01762 fd_cache[h].con=c;
01763 }
01764
01765 #endif
01766
01767
01768
01769 inline static int tcpconn_chld_put(struct tcp_connection* tcpconn);
01770
01771 static int tcpconn_send_put(struct tcp_connection* c, const char* buf,
01772 unsigned len, snd_flags_t send_flags);
01773 static int tcpconn_do_send(int fd, struct tcp_connection* c,
01774 const char* buf, unsigned len,
01775 snd_flags_t send_flags, long* resp, int locked);
01776
01777 static int tcpconn_1st_send(int fd, struct tcp_connection* c,
01778 const char* buf, unsigned len,
01779 snd_flags_t send_flags, long* resp, int locked);
01780
01781
01782
01783
01784
01785
01786 int tcp_send(struct dest_info* dst, union sockaddr_union* from,
01787 const char* buf, unsigned len)
01788 {
01789 struct tcp_connection *c;
01790 struct ip_addr ip;
01791 int port;
01792 int fd;
01793 long response[2];
01794 int n;
01795 ticks_t con_lifetime;
01796 #ifdef USE_TLS
01797 const char* rest_buf;
01798 const char* t_buf;
01799 unsigned rest_len, t_len;
01800 long resp;
01801 snd_flags_t t_send_flags;
01802 #endif
01803
01804 port=su_getport(&dst->to);
01805 con_lifetime=cfg_get(tcp, tcp_cfg, con_lifetime);
01806 if (likely(port)){
01807 su2ip_addr(&ip, &dst->to);
01808 c=tcpconn_get(dst->id, &ip, port, from, con_lifetime);
01809 }else if (likely(dst->id)){
01810 c=tcpconn_get(dst->id, 0, 0, 0, con_lifetime);
01811 }else{
01812 LOG(L_CRIT, "BUG: tcp_send called with null id & to\n");
01813 return -1;
01814 }
01815
01816 if (likely(dst->id)){
01817 if (unlikely(c==0)) {
01818 if (likely(port)){
01819
01820 c=tcpconn_get(0, &ip, port, from, con_lifetime);
01821 }else{
01822 LOG(L_ERR, "ERROR: tcp_send: id %d not found, dropping\n",
01823 dst->id);
01824 return -1;
01825 }
01826 }
01827 }
01828
01829 if (unlikely((c==0) || tcpconn_close_after_send(c))){
01830 if (unlikely(c)){
01831
01832
01833 tcpconn_chld_put(c);
01834 c=0;
01835 }
01836
01837 if (unlikely((dst->send_flags.f & SND_F_FORCE_CON_REUSE) ||
01838 cfg_get(tcp, tcp_cfg, no_connect)))
01839 return -1;
01840 DBG("tcp_send: no open tcp connection found, opening new one\n");
01841
01842 if (likely(from==0)){
01843
01844 switch (dst->to.s.sa_family) {
01845 case AF_INET:
01846 from = tcp_source_ipv4;
01847 break;
01848 #ifdef USE_IPV6
01849 case AF_INET6:
01850 from = tcp_source_ipv6;
01851 break;
01852 #endif
01853 default:
01854
01855 break;
01856 }
01857 }
01858 #if defined(TCP_CONNECT_WAIT) && defined(TCP_ASYNC)
01859 if (likely(cfg_get(tcp, tcp_cfg, tcp_connect_wait) &&
01860 cfg_get(tcp, tcp_cfg, async) )){
01861 if (unlikely(*tcp_connections_no >=
01862 cfg_get(tcp, tcp_cfg, max_connections))){
01863 LOG(L_ERR, "ERROR: tcp_send %s: maximum number of"
01864 " connections exceeded (%d/%d)\n",
01865 su2a(&dst->to, sizeof(dst->to)),
01866 *tcp_connections_no,
01867 cfg_get(tcp, tcp_cfg, max_connections));
01868 return -1;
01869 }
01870 if (unlikely(dst->proto==PROTO_TLS)) {
01871 if (unlikely(*tls_connections_no >=
01872 cfg_get(tcp, tcp_cfg, max_tls_connections))){
01873 LM_ERR("tcp_send %s: maximum number of"
01874 " tls connections exceeded (%d/%d)\n",
01875 su2a(&dst->to, sizeof(dst->to)),
01876 *tls_connections_no,
01877 cfg_get(tcp, tcp_cfg, max_tls_connections));
01878 return -1;
01879 }
01880 }
01881 c=tcpconn_new(-1, &dst->to, from, 0, dst->proto,
01882 S_CONN_CONNECT);
01883 if (unlikely(c==0)){
01884 LOG(L_ERR, "ERROR: tcp_send %s: could not create new"
01885 " connection\n",
01886 su2a(&dst->to, sizeof(dst->to)));
01887 return -1;
01888 }
01889 c->flags|=F_CONN_PENDING|F_CONN_FD_CLOSED;
01890 tcpconn_set_send_flags(c, dst->send_flags);
01891 atomic_set(&c->refcnt, 2);
01892
01893
01894 if (unlikely(tcpconn_add(c)==0)){
01895 LOG(L_ERR, "ERROR: tcp_send %s: could not add "
01896 "connection %p\n",
01897 su2a(&dst->to, sizeof(dst->to)),
01898 c);
01899 _tcpconn_free(c);
01900 n=-1;
01901 goto end_no_conn;
01902 }
01903
01904
01905 if (unlikely((fd=tcpconn_finish_connect(c, from))<0)){
01906
01907
01908 LOG(L_ERR, "ERROR: tcp_send %s: tcpconn_finish_connect(%p)"
01909 " failed\n", su2a(&dst->to, sizeof(dst->to)),
01910 c);
01911 goto conn_wait_error;
01912 }
01913
01914
01915
01916
01917
01918
01919
01920
01921
01922
01923
01924 #ifdef USE_TLS
01925 if (unlikely(c->type==PROTO_TLS)) {
01926
01927
01928
01929
01930
01931
01932
01933
01934
01935 response[1] = CONN_NOP;
01936 t_buf = buf;
01937 t_len = len;
01938 lock_get(&c->write_lock);
01939 redo_tls_encode:
01940 t_send_flags = dst->send_flags;
01941 n = tls_encode(c, &t_buf, &t_len, &rest_buf, &rest_len,
01942 &t_send_flags);
01943
01944
01945
01946
01947
01948
01949
01950
01951
01952
01953
01954
01955
01956 if (unlikely(n < 0)) {
01957 lock_release(&c->write_lock);
01958 goto conn_wait_error;
01959 }
01960 if (likely(rest_len == 0)) {
01961
01962
01963 if (likely(response[1] != CONN_NEW_PENDING_WRITE)) {
01964
01965
01966
01967
01968 lock_release(&c->write_lock);
01969 if (likely(t_len != 0)) {
01970 n=tcpconn_1st_send(fd, c, t_buf, t_len,
01971 t_send_flags,
01972 &response[1], 0);
01973 } else {
01974 if (response[1] == CONN_NOP) {
01975
01976
01977
01978
01979
01980
01981
01982 response[1] = CONN_NEW_COMPLETE;
01983 }
01984 }
01985
01986 } else {
01987
01988
01989
01990 if (unlikely(t_len &&
01991 _wbufq_add(c, t_buf, t_len) < 0)) {
01992 response[1] = CONN_ERROR;
01993 n = -1;
01994 }
01995 lock_release(&c->write_lock);
01996
01997 }
01998 } else {
01999
02000
02001
02002
02003
02004 if (likely((response[1] == CONN_NOP ||
02005 response[1] == CONN_NEW_COMPLETE ||
02006 response[1] == CONN_EOF ) && t_len))
02007 n = tcpconn_1st_send(fd, c, t_buf, t_len,
02008 t_send_flags,
02009 &response[1], 1);
02010 else if (unlikely(t_len &&
02011 _wbufq_add(c, t_buf, t_len) < 0)) {
02012
02013 response[1] = CONN_ERROR;
02014 n = -1;
02015 }
02016 if (likely(n >= 0)) {
02017
02018
02019 t_buf = rest_buf;
02020 t_len = rest_len;
02021 goto redo_tls_encode;
02022 } else {
02023 lock_release(&c->write_lock);
02024
02025 }
02026 }
02027 } else
02028 #endif
02029 n=tcpconn_1st_send(fd, c, buf, len, dst->send_flags,
02030 &response[1], 0);
02031 if (unlikely(n<0))
02032 goto conn_wait_error;
02033 if (unlikely(response[1]==CONN_EOF)){
02034
02035
02036
02037
02038
02039 goto conn_wait_close;
02040 }
02041
02042 response[0]=(long)c;
02043 if (unlikely(send_fd(unix_tcp_sock, response,
02044 sizeof(response), fd) <= 0)){
02045 LOG(L_ERR, "BUG: tcp_send %s: %ld for %p"
02046 " failed:" " %s (%d)\n",
02047 su2a(&dst->to, sizeof(dst->to)),
02048 response[1], c, strerror(errno), errno);
02049 goto conn_wait_error;
02050 }
02051 goto conn_wait_success;
02052 }
02053 #endif
02054 if (unlikely((c=tcpconn_connect(&dst->to, from, dst->proto,
02055 &dst->send_flags))==0)){
02056 LOG(L_ERR, "ERROR: tcp_send %s: connect failed\n",
02057 su2a(&dst->to, sizeof(dst->to)));
02058 return -1;
02059 }
02060 tcpconn_set_send_flags(c, dst->send_flags);
02061 if (likely(c->state==S_CONN_OK))
02062 TCP_STATS_ESTABLISHED(S_CONN_CONNECT);
02063 atomic_set(&c->refcnt, 2);
02064
02065 fd=c->s;
02066 c->flags|=F_CONN_FD_CLOSED;
02067
02068
02069
02070
02071 response[0]=(long)c;
02072 response[1]=CONN_NEW;
02073 n=send_fd(unix_tcp_sock, response, sizeof(response), c->s);
02074 if (unlikely(n<=0)){
02075 LOG(L_ERR, "BUG: tcp_send %s: failed send_fd: %s (%d)\n",
02076 su2a(&dst->to, sizeof(dst->to)),
02077 strerror(errno), errno);
02078
02079 _tcpconn_free(c);
02080 n=-1;
02081 goto end_no_conn;
02082 }
02083
02084 #ifdef USE_TLS
02085 if (unlikely(c->type==PROTO_TLS)) {
02086
02087
02088
02089
02090
02091 response[1] = CONN_NOP;
02092 t_buf = buf;
02093 t_len = len;
02094 lock_get(&c->write_lock);
02095 do {
02096 t_send_flags = dst->send_flags;
02097 n = tls_encode(c, &t_buf, &t_len, &rest_buf, &rest_len,
02098 &t_send_flags);
02099 if (likely(n > 0)) {
02100 n = tcpconn_do_send(fd, c, t_buf, t_len, t_send_flags,
02101 &resp, 1);
02102 if (likely(response[1] != CONN_QUEUED_WRITE ||
02103 resp == CONN_ERROR))
02104
02105
02106 response[1] = resp;
02107 } else if (unlikely(n < 0)) {
02108 response[1] = CONN_ERROR;
02109 break;
02110 }
02111
02112
02113 t_buf = rest_buf;
02114 t_len = rest_len;
02115 } while(unlikely(rest_len && n > 0));
02116 lock_release(&c->write_lock);
02117 } else
02118 #endif
02119 n = tcpconn_do_send(fd, c, buf, len, dst->send_flags,
02120 &response[1], 0);
02121 if (unlikely(response[1] != CONN_NOP)) {
02122 response[0]=(long)c;
02123 if (send_all(unix_tcp_sock, response, sizeof(response)) <= 0) {
02124 BUG("tcp_main command %ld sending failed (write):"
02125 "%s (%d)\n", response[1], strerror(errno), errno);
02126
02127
02128
02129
02130 tcpconn_chld_put(c);
02131 n=-1;
02132 }
02133
02134
02135
02136 if (unlikely(n < 0 || response[1] == CONN_EOF)) {
02137
02138 tcp_safe_close(fd);
02139 } else if (response[1] == CONN_QUEUED_WRITE) {
02140 #ifdef TCP_FD_CACHE
02141 if (cfg_get(tcp, tcp_cfg, fd_cache)) {
02142 tcp_fd_cache_add(c, fd);
02143 } else
02144 #endif
02145 tcp_safe_close(fd);
02146 } else {
02147 BUG("unexpected tcpconn_do_send() return & response:"
02148 " %d, %ld\n", n, response[1]);
02149 }
02150 goto end_no_deref;
02151 }
02152 #ifdef TCP_FD_CACHE
02153 if (cfg_get(tcp, tcp_cfg, fd_cache)) {
02154 tcp_fd_cache_add(c, fd);
02155 }else
02156 #endif
02157 tcp_safe_close(fd);
02158
02159
02160 goto release_c;
02161 }
02162
02163 n = tcpconn_send_put(c, buf, len, dst->send_flags);
02164
02165 return n;
02166 #ifdef TCP_CONNECT_WAIT
02167 conn_wait_success:
02168 #ifdef TCP_FD_CACHE
02169 if (cfg_get(tcp, tcp_cfg, fd_cache)) {
02170 tcp_fd_cache_add(c, fd);
02171 } else
02172 #endif
02173 if (unlikely (tcp_safe_close(fd) < 0))
02174 LOG(L_ERR, "closing temporary send fd for %p: %s: "
02175 "close(%d) failed (flags 0x%x): %s (%d)\n", c,
02176 su2a(&c->rcv.src_su, sizeof(c->rcv.src_su)),
02177 fd, c->flags, strerror(errno), errno);
02178 tcpconn_chld_put(c);
02179 return n;
02180 conn_wait_error:
02181 n=-1;
02182 conn_wait_close:
02183
02184
02185
02186
02187
02188
02189 c->state=S_CONN_BAD;
02190
02191
02192 if (fd>=0) {
02193 if (unlikely(tcp_safe_close(fd) < 0 ))
02194 LOG(L_ERR, "closing temporary send fd for %p: %s: "
02195 "close(%d) failed (flags 0x%x): %s (%d)\n", c,
02196 su2a(&c->rcv.src_su, sizeof(c->rcv.src_su)),
02197 fd, c->flags, strerror(errno), errno);
02198 }
02199
02200
02201
02202
02203 TCPCONN_LOCK;
02204 _tcpconn_detach(c);
02205 c->flags&=~F_CONN_HASHED;
02206 tcpconn_put(c);
02207 TCPCONN_UNLOCK;
02208
02209 tcpconn_chld_put(c);
02210 return n;
02211 #endif
02212 release_c:
02213 tcpconn_chld_put(c);
02214 end_no_deref:
02215 end_no_conn:
02216 return n;
02217 }
02218
02219
02220
02230 static int tcpconn_send_put(struct tcp_connection* c, const char* buf,
02231 unsigned len, snd_flags_t send_flags)
02232 {
02233 struct tcp_connection *tmp;
02234 int fd;
02235 long response[2];
02236 int n;
02237 int do_close_fd;
02238 #ifdef USE_TLS
02239 const char* rest_buf;
02240 const char* t_buf;
02241 unsigned rest_len, t_len;
02242 long resp;
02243 snd_flags_t t_send_flags;
02244 #endif
02245 #ifdef TCP_FD_CACHE
02246 struct fd_cache_entry* fd_cache_e;
02247 int use_fd_cache;
02248
02249 use_fd_cache=cfg_get(tcp, tcp_cfg, fd_cache);
02250 fd_cache_e=0;
02251 #endif
02252 do_close_fd=1;
02253 response[1] = CONN_NOP;
02254 #ifdef TCP_ASYNC
02255
02256 #ifdef TCP_CONNECT_WAIT
02257 if (unlikely(cfg_get(tcp, tcp_cfg, async) &&
02258 (_wbufq_non_empty(c) || (c->flags&F_CONN_PENDING)) ))
02259 #else
02260 if (unlikely(cfg_get(tcp, tcp_cfg, async) && (_wbufq_non_empty(c)) ))
02261 #endif
02262 {
02263 lock_get(&c->write_lock);
02264 #ifdef TCP_CONNECT_WAIT
02265 if (likely(_wbufq_non_empty(c) || (c->flags&F_CONN_PENDING)))
02266 #else
02267 if (likely(_wbufq_non_empty(c)))
02268 #endif
02269 {
02270 do_close_fd=0;
02271 #ifdef USE_TLS
02272 if (unlikely(c->type==PROTO_TLS)) {
02273 t_buf = buf;
02274 t_len = len;
02275 do {
02276 t_send_flags = send_flags;
02277 n = tls_encode(c, &t_buf, &t_len,
02278 &rest_buf, &rest_len,
02279 &t_send_flags);
02280 if (unlikely((n < 0) || (t_len &&
02281 (_wbufq_add(c, t_buf, t_len) < 0)))) {
02282 lock_release(&c->write_lock);
02283 n=-1;
02284 response[1] = CONN_ERROR;
02285 c->state=S_CONN_BAD;
02286 c->timeout=get_ticks_raw();
02287 goto error;
02288 }
02289 t_buf = rest_buf;
02290 t_len = rest_len;
02291 } while(unlikely(rest_len && n > 0));
02292 } else
02293 #endif
02294 if (unlikely(len && (_wbufq_add(c, buf, len)<0))){
02295 lock_release(&c->write_lock);
02296 n=-1;
02297 response[1] = CONN_ERROR;
02298 c->state=S_CONN_BAD;
02299 c->timeout=get_ticks_raw();
02300 goto error;
02301 }
02302 n=len;
02303 lock_release(&c->write_lock);
02304 goto release_c;
02305 }
02306 lock_release(&c->write_lock);
02307 }
02308 #endif
02309
02310
02311 if (c->reader_pid==my_pid()){
02312 DBG("tcp_send: send from reader (%d (%d)), reusing fd\n",
02313 my_pid(), process_no);
02314 fd=c->fd;
02315 do_close_fd=0;
02316 #ifdef TCP_FD_CACHE
02317 use_fd_cache=0;
02318
02319
02320
02321 }else if (likely(use_fd_cache &&
02322 ((fd_cache_e=tcp_fd_cache_get(c))!=0))){
02323 fd=fd_cache_e->fd;
02324 do_close_fd=0;
02325 DBG("tcp_send: found fd in cache ( %d, %p, %d)\n",
02326 fd, c, fd_cache_e->id);
02327 #endif
02328 }else{
02329 DBG("tcp_send: tcp connection found (%p), acquiring fd\n", c);
02330
02331 response[0]=(long)c;
02332 response[1]=CONN_GET_FD;
02333 n=send_all(unix_tcp_sock, response, sizeof(response));
02334 if (unlikely(n<=0)){
02335 LOG(L_ERR, "BUG: tcp_send: failed to get fd(write):%s (%d)\n",
02336 strerror(errno), errno);
02337 n=-1;
02338 goto release_c;
02339 }
02340 DBG("tcp_send, c= %p, n=%d\n", c, n);
02341 n=receive_fd(unix_tcp_sock, &tmp, sizeof(tmp), &fd, MSG_WAITALL);
02342 if (unlikely(n<=0)){
02343 LOG(L_ERR, "BUG: tcp_send: failed to get fd(receive_fd):"
02344 " %s (%d)\n", strerror(errno), errno);
02345 n=-1;
02346 do_close_fd=0;
02347 goto release_c;
02348 }
02349
02350
02351
02352
02353
02354 if (unlikely(c!=tmp || fd==-1 || c->state==S_CONN_BAD)){
02355 if (unlikely(c!=tmp && tmp!=0))
02356 BUG("tcp_send: get_fd: got different connection:"
02357 " %p (id= %d, refcnt=%d state=%d) != "
02358 " %p (n=%d)\n",
02359 c, c->id, atomic_get(&c->refcnt), c->state,
02360 tmp, n
02361 );
02362 n=-1;
02363
02364 do_close_fd = (fd==-1)?0:1;
02365 #ifdef TCP_FD_CACHE
02366 use_fd_cache = 0;
02367 #endif
02368 goto end;
02369 }
02370 DBG("tcp_send: after receive_fd: c= %p n=%d fd=%d\n",c, n, fd);
02371 }
02372
02373 #ifdef USE_TLS
02374 if (unlikely(c->type==PROTO_TLS)) {
02375
02376
02377
02378
02379
02380 response[1] = CONN_NOP;
02381 t_buf = buf;
02382 t_len = len;
02383 lock_get(&c->write_lock);
02384 do {
02385 t_send_flags = send_flags;
02386 n = tls_encode(c, &t_buf, &t_len, &rest_buf, &rest_len,
02387 &t_send_flags);
02388 if (likely(n > 0)) {
02389 n = tcpconn_do_send(fd, c, t_buf, t_len, t_send_flags,
02390 &resp, 1);
02391 if (likely(response[1] != CONN_QUEUED_WRITE ||
02392 resp == CONN_ERROR))
02393
02394
02395 response[1] = resp;
02396 } else if (unlikely(n < 0)) {
02397 response[1] = CONN_ERROR;
02398 break;
02399 }
02400
02401
02402 t_buf = rest_buf;
02403 t_len = rest_len;
02404 } while(unlikely(rest_len && n > 0));
02405 lock_release(&c->write_lock);
02406 } else
02407 #endif
02408 n = tcpconn_do_send(fd, c, buf, len, send_flags, &response[1], 0);
02409 if (unlikely(response[1] != CONN_NOP)) {
02410 error:
02411 response[0]=(long)c;
02412 if (send_all(unix_tcp_sock, response, sizeof(response)) <= 0) {
02413 BUG("tcp_main command %ld sending failed (write):%s (%d)\n",
02414 response[1], strerror(errno), errno);
02415
02416
02417
02418
02419 tcpconn_chld_put(c);
02420 n=-1;
02421 }
02422
02423
02424
02425 if (unlikely(n < 0 || response[1] == CONN_EOF)) {
02426
02427 #ifdef TCP_FD_CACHE
02428 if (unlikely(fd_cache_e)){
02429 tcp_fd_cache_rm(fd_cache_e);
02430 fd_cache_e = 0;
02431 tcp_safe_close(fd);
02432 }else
02433 #endif
02434 if (do_close_fd) tcp_safe_close(fd);
02435 } else if (response[1] == CONN_QUEUED_WRITE) {
02436 #ifdef TCP_FD_CACHE
02437 if (unlikely((fd_cache_e==0) && use_fd_cache)){
02438 tcp_fd_cache_add(c, fd);
02439 }else
02440 #endif
02441 if (do_close_fd) tcp_safe_close(fd);
02442 } else {
02443 BUG("unexpected tcpconn_do_send() return & response: %d, %ld\n",
02444 n, response[1]);
02445 }
02446 return n;
02447 }
02448 end:
02449 #ifdef TCP_FD_CACHE
02450 if (unlikely((fd_cache_e==0) && use_fd_cache)){
02451 tcp_fd_cache_add(c, fd);
02452 }else
02453 #endif
02454 if (do_close_fd) {
02455 if (unlikely(tcp_safe_close(fd) < 0))
02456 LOG(L_ERR, "closing temporary send fd for %p: %s: "
02457 "close(%d) failed (flags 0x%x): %s (%d)\n", c,
02458 su2a(&c->rcv.src_su, sizeof(c->rcv.src_su)),
02459 fd, c->flags, strerror(errno), errno);
02460 }
02461
02462
02463 release_c:
02464 tcpconn_chld_put(c);
02465 return n;
02466 }
02467
02468
02469
02470
02471
02472
02473
02474
02475
02476
02477
02478
02479
02480
02481
02482
02483 int tcpconn_send_unsafe(int fd, struct tcp_connection *c,
02484 const char* buf, unsigned len, snd_flags_t send_flags)
02485 {
02486 int n;
02487 long response[2];
02488
02489 n = tcpconn_do_send(fd, c, buf, len, send_flags, &response[1], 1);
02490 if (unlikely(response[1] != CONN_NOP)) {
02491
02492
02493
02494
02495
02496 atomic_inc(&c->refcnt);
02497 response[0]=(long)c;
02498 if (send_all(unix_tcp_sock, response, sizeof(response)) <= 0) {
02499 BUG("connection %p command %ld sending failed (write):%s (%d)\n",
02500 c, response[1], strerror(errno), errno);
02501
02502 tcpconn_chld_put(c);
02503 n=-1;
02504 }
02505
02506
02507
02508 return n;
02509 }
02510 return n;
02511 }
02512
02513
02514
02539 static int tcpconn_do_send(int fd, struct tcp_connection* c,
02540 const char* buf, unsigned len,
02541 snd_flags_t send_flags, long* resp,
02542 int locked)
02543 {
02544 int n;
02545 #ifdef TCP_ASYNC
02546 int enable_write_watch;
02547 #endif
02548
02549 DBG("tcp_send: sending...\n");
02550 *resp = CONN_NOP;
02551 if (likely(!locked)) lock_get(&c->write_lock);
02552
02553 tcpconn_set_send_flags(c, send_flags);
02554 #ifdef TCP_ASYNC
02555 if (likely(cfg_get(tcp, tcp_cfg, async))){
02556 if (_wbufq_non_empty(c)
02557 #ifdef TCP_CONNECT_WAIT
02558 || (c->flags&F_CONN_PENDING)
02559 #endif
02560 ){
02561 if (unlikely(_wbufq_add(c, buf, len)<0)){
02562 if (likely(!locked)) lock_release(&c->write_lock);
02563 n=-1;
02564 goto error;
02565 }
02566 if (likely(!locked)) lock_release(&c->write_lock);
02567 n=len;
02568 goto end;
02569 }
02570 n=_tcpconn_write_nb(fd, c, buf, len);
02571 }else{
02572 #endif
02573
02574 n=tsend_stream(fd, buf, len,
02575 TICKS_TO_S(cfg_get(tcp, tcp_cfg, send_timeout)) *
02576 1000);
02577 #ifdef TCP_ASYNC
02578 }
02579 #else
02580 if (likely(!locked)) lock_release(&c->write_lock);
02581 #endif
02582
02583 DBG("tcp_send: after real write: c= %p n=%d fd=%d\n",c, n, fd);
02584 DBG("tcp_send: buf=\n%.*s\n", (int)len, buf);
02585 if (unlikely(n<(int)len)){
02586 #ifdef TCP_ASYNC
02587 if (cfg_get(tcp, tcp_cfg, async) &&
02588 ((n>=0) || errno==EAGAIN || errno==EWOULDBLOCK)){
02589 enable_write_watch=_wbufq_empty(c);
02590 if (n<0) n=0;
02591 else if (unlikely(c->state==S_CONN_CONNECT ||
02592 c->state==S_CONN_ACCEPT)){
02593 TCP_STATS_ESTABLISHED(c->state);
02594 c->state=S_CONN_OK;
02595 }
02596 if (unlikely(_wbufq_add(c, buf+n, len-n)<0)){
02597 if (likely(!locked)) lock_release(&c->write_lock);
02598 n=-1;
02599 goto error;
02600 }
02601 if (likely(!locked)) lock_release(&c->write_lock);
02602 n=len;
02603 if (likely(enable_write_watch))
02604 *resp=CONN_QUEUED_WRITE;
02605 goto end;
02606 }else{
02607 if (likely(!locked)) lock_release(&c->write_lock);
02608 }
02609 #endif
02610 if (unlikely(c->state==S_CONN_CONNECT)){
02611 switch(errno){
02612 case ENETUNREACH:
02613 case EHOSTUNREACH:
02614 #ifdef USE_DST_BLACKLIST
02615 dst_blacklist_su(BLST_ERR_CONNECT, c->rcv.proto,
02616 &c->rcv.src_su, &c->send_flags, 0);
02617 #endif
02618 TCP_EV_CONNECT_UNREACHABLE(errno, TCP_LADDR(c),
02619 TCP_LPORT(c), TCP_PSU(c), TCP_PROTO(c));
02620 break;
02621 case ECONNREFUSED:
02622 case ECONNRESET:
02623 #ifdef USE_DST_BLACKLIST
02624 dst_blacklist_su(BLST_ERR_CONNECT, c->rcv.proto,
02625 &c->rcv.src_su, &c->send_flags, 0);
02626 #endif
02627 TCP_EV_CONNECT_RST(errno, TCP_LADDR(c), TCP_LPORT(c),
02628 TCP_PSU(c), TCP_PROTO(c));
02629 break;
02630 default:
02631 TCP_EV_CONNECT_ERR(errno, TCP_LADDR(c), TCP_LPORT(c),
02632 TCP_PSU(c), TCP_PROTO(c));
02633 }
02634 TCP_STATS_CONNECT_FAILED();
02635 }else{
02636 switch(errno){
02637 case ECONNREFUSED:
02638 case ECONNRESET:
02639 TCP_STATS_CON_RESET();
02640
02641 case ENETUNREACH:
02642
02643 #ifdef USE_DST_BLACKLIST
02644 dst_blacklist_su(BLST_ERR_SEND, c->rcv.proto,
02645 &c->rcv.src_su, &c->send_flags, 0);
02646 #endif
02647 break;
02648 }
02649 }
02650 LOG(L_ERR, "ERROR: tcp_send: failed to send on %p (%s:%d->%s): %s (%d)"
02651 "\n", c, ip_addr2a(&c->rcv.dst_ip), c->rcv.dst_port,
02652 su2a(&c->rcv.src_su, sizeof(c->rcv.src_su)),
02653 strerror(errno), errno);
02654 n = -1;
02655 #ifdef TCP_ASYNC
02656 error:
02657 #endif
02658
02659 c->state=S_CONN_BAD;
02660 c->timeout=get_ticks_raw();
02661
02662 *resp=CONN_ERROR;
02663 return n;
02664 }
02665
02666 #ifdef TCP_ASYNC
02667 if (likely(!locked)) lock_release(&c->write_lock);
02668 #endif
02669
02670 if (unlikely(c->state==S_CONN_CONNECT || c->state==S_CONN_ACCEPT)){
02671 TCP_STATS_ESTABLISHED(c->state);
02672 c->state=S_CONN_OK;
02673 }
02674 if (unlikely(send_flags.f & SND_F_CON_CLOSE)){
02675
02676 c->state=S_CONN_BAD;
02677 c->timeout=get_ticks_raw();
02678
02679 *resp=CONN_EOF;
02680 return n;
02681 }
02682 end:
02683 return n;
02684 }
02685
02686
02687
02716 static int tcpconn_1st_send(int fd, struct tcp_connection* c,
02717 const char* buf, unsigned len,
02718 snd_flags_t send_flags, long* resp,
02719 int locked)
02720 {
02721 int n;
02722
02723 n=_tcpconn_write_nb(fd, c, buf, len);
02724 if (unlikely(n<(int)len)){
02725
02726
02727
02728 if ((n>=0) || errno==EAGAIN || errno==EWOULDBLOCK || errno==ENOTCONN){
02729 DBG("pending write on new connection %p "
02730 " (%d/%d bytes written)\n", c, n, len);
02731 if (unlikely(n<0)) n=0;
02732 else{
02733 if (likely(c->state == S_CONN_CONNECT))
02734 TCP_STATS_ESTABLISHED(S_CONN_CONNECT);
02735 c->state=S_CONN_OK;
02736
02737 }
02738
02739 if (likely(!locked)) lock_get(&c->write_lock);
02740 if (unlikely(_wbufq_insert(c, buf+n, len-n)<0)){
02741 if (likely(!locked)) lock_release(&c->write_lock);
02742 n=-1;
02743 LOG(L_ERR, "%s: EAGAIN and"
02744 " write queue full or failed for %p\n",
02745 su2a(&c->rcv.src_su, sizeof(c->rcv.src_su)), c);
02746 goto error;
02747 }
02748 if (likely(!locked)) lock_release(&c->write_lock);
02749
02750 *resp=CONN_NEW_PENDING_WRITE;
02751 n=len;
02752 goto end;
02753 }
02754
02755
02756
02757 switch(errno){
02758 case ENETUNREACH:
02759 case EHOSTUNREACH:
02760 #ifdef USE_DST_BLACKLIST
02761 dst_blacklist_su( BLST_ERR_CONNECT, c->rcv.proto,
02762 &c->rcv.src_su, &c->send_flags, 0);
02763 #endif
02764 TCP_EV_CONNECT_UNREACHABLE(errno, TCP_LADDR(c),
02765 TCP_LPORT(c), TCP_PSU(c), TCP_PROTO(c));
02766 break;
02767 case ECONNREFUSED:
02768 case ECONNRESET:
02769 #ifdef USE_DST_BLACKLIST
02770 dst_blacklist_su( BLST_ERR_CONNECT, c->rcv.proto,
02771 &c->rcv.src_su, &c->send_flags, 0);
02772 #endif
02773 TCP_EV_CONNECT_RST(errno, TCP_LADDR(c),
02774 TCP_LPORT(c), TCP_PSU(c), TCP_PROTO(c));
02775 break;
02776 default:
02777 TCP_EV_CONNECT_ERR(errno, TCP_LADDR(c),
02778 TCP_LPORT(c), TCP_PSU(c), TCP_PROTO(c));
02779 }
02780
02781 TCP_STATS_CONNECT_FAILED();
02782 LOG(L_ERR, "%s: connect & send for %p failed:" " %s (%d)\n",
02783 su2a(&c->rcv.src_su, sizeof(c->rcv.src_su)),
02784 c, strerror(errno), errno);
02785 goto error;
02786 }
02787 LOG(L_INFO, "quick connect for %p\n", c);
02788 if (likely(c->state == S_CONN_CONNECT))
02789 TCP_STATS_ESTABLISHED(S_CONN_CONNECT);
02790 if (unlikely(send_flags.f & SND_F_CON_CLOSE)){
02791
02792 c->state=S_CONN_BAD;
02793
02794 *resp=CONN_EOF;
02795 }else{
02796 c->state=S_CONN_OK;
02797
02798 *resp=CONN_NEW_COMPLETE;
02799 }
02800 end:
02801 return n;
02802 error:
02803 *resp=CONN_ERROR;
02804 return -1;
02805 }
02806
02807
02808
02809 int tcp_init(struct socket_info* sock_info)
02810 {
02811 union sockaddr_union* addr;
02812 int optval;
02813 #ifdef HAVE_TCP_ACCEPT_FILTER
02814 struct accept_filter_arg afa;
02815 #endif
02816 #ifdef DISABLE_NAGLE
02817 int flag;
02818 struct protoent* pe;
02819
02820 if (tcp_proto_no==-1){
02821 pe=getprotobyname("tcp");
02822 if (pe==0){
02823 LOG(L_ERR, "ERROR: tcp_init: could not get TCP protocol number\n");
02824 tcp_proto_no=-1;
02825 }else{
02826 tcp_proto_no=pe->p_proto;
02827 }
02828 }
02829 #endif
02830
02831 addr=&sock_info->su;
02832
02833 if (init_su(addr, &sock_info->address, sock_info->port_no)<0){
02834 LOG(L_ERR, "ERROR: tcp_init: could no init sockaddr_union\n");
02835 goto error;
02836 }
02837 DBG("tcp_init: added %s\n", su2a(addr, sizeof(*addr)));
02838 sock_info->socket=socket(AF2PF(addr->s.sa_family), SOCK_STREAM, 0);
02839 if (sock_info->socket==-1){
02840 LOG(L_ERR, "ERROR: tcp_init: socket: %s\n", strerror(errno));
02841 goto error;
02842 }
02843 #ifdef DISABLE_NAGLE
02844 flag=1;
02845 if ( (tcp_proto_no!=-1) &&
02846 (setsockopt(sock_info->socket, tcp_proto_no , TCP_NODELAY,
02847 &flag, sizeof(flag))<0) ){
02848 LOG(L_ERR, "ERROR: tcp_init: could not disable Nagle: %s\n",
02849 strerror(errno));
02850 }
02851 #endif
02852
02853
02854 #if !defined(TCP_DONT_REUSEADDR)
02855
02856
02857
02858
02859
02860
02861
02862
02863
02864 optval=1;
02865 if (setsockopt(sock_info->socket, SOL_SOCKET, SO_REUSEADDR,
02866 (void*)&optval, sizeof(optval))==-1) {
02867 LOG(L_ERR, "ERROR: tcp_init: setsockopt %s\n",
02868 strerror(errno));
02869 goto error;
02870 }
02871 #endif
02872
02873 optval = tos;
02874 if (setsockopt(sock_info->socket, IPPROTO_IP, IP_TOS, (void*)&optval,
02875 sizeof(optval)) ==-1){
02876 LOG(L_WARN, "WARNING: tcp_init: setsockopt tos: %s\n", strerror(errno));
02877
02878 }
02879 #ifdef HAVE_TCP_DEFER_ACCEPT
02880
02881 if ((optval=cfg_get(tcp, tcp_cfg, defer_accept))){
02882 if (setsockopt(sock_info->socket, IPPROTO_TCP, TCP_DEFER_ACCEPT,
02883 (void*)&optval, sizeof(optval)) ==-1){
02884 LOG(L_WARN, "WARNING: tcp_init: setsockopt TCP_DEFER_ACCEPT %s\n",
02885 strerror(errno));
02886
02887 }
02888 }
02889 #endif
02890 #ifdef HAVE_TCP_SYNCNT
02891 if ((optval=cfg_get(tcp, tcp_cfg, syncnt))){
02892 if (setsockopt(sock_info->socket, IPPROTO_TCP, TCP_SYNCNT, &optval,
02893 sizeof(optval))<0){
02894 LOG(L_WARN, "WARNING: tcp_init: failed to set"
02895 " maximum SYN retr. count: %s\n", strerror(errno));
02896 }
02897 }
02898 #endif
02899 #ifdef HAVE_TCP_LINGER2
02900 if ((optval=cfg_get(tcp, tcp_cfg, linger2))){
02901 if (setsockopt(sock_info->socket, IPPROTO_TCP, TCP_LINGER2, &optval,
02902 sizeof(optval))<0){
02903 LOG(L_WARN, "WARNING: tcp_init: failed to set"
02904 " maximum LINGER2 timeout: %s\n", strerror(errno));
02905 }
02906 }
02907 #endif
02908 init_sock_keepalive(sock_info->socket);
02909 if (bind(sock_info->socket, &addr->s, sockaddru_len(*addr))==-1){
02910 LOG(L_ERR, "ERROR: tcp_init: bind(%x, %p, %d) on %s:%d : %s\n",
02911 sock_info->socket, &addr->s,
02912 (unsigned)sockaddru_len(*addr),
02913 sock_info->address_str.s,
02914 sock_info->port_no,
02915 strerror(errno));
02916 goto error;
02917 }
02918 if (listen(sock_info->socket, TCP_LISTEN_BACKLOG)==-1){
02919 LOG(L_ERR, "ERROR: tcp_init: listen(%x, %p, %d) on %s: %s\n",
02920 sock_info->socket, &addr->s,
02921 (unsigned)sockaddru_len(*addr),
02922 sock_info->address_str.s,
02923 strerror(errno));
02924 goto error;
02925 }
02926 #ifdef HAVE_TCP_ACCEPT_FILTER
02927
02928 if (cfg_get(tcp, tcp_cfg, defer_accept)){
02929 memset(&afa, 0, sizeof(afa));
02930 strcpy(afa.af_name, "dataready");
02931 if (setsockopt(sock_info->socket, SOL_SOCKET, SO_ACCEPTFILTER,
02932 (void*)&afa, sizeof(afa)) ==-1){
02933 LOG(L_WARN, "WARNING: tcp_init: setsockopt SO_ACCEPTFILTER %s\n",
02934 strerror(errno));
02935
02936 }
02937 }
02938 #endif
02939
02940 return 0;
02941 error:
02942 if (sock_info->socket!=-1){
02943 tcp_safe_close(sock_info->socket);
02944 sock_info->socket=-1;
02945 }
02946 return -1;
02947 }
02948
02949
02950
02951
02952
02953 inline static void tcpconn_close_main_fd(struct tcp_connection* tcpconn)
02954 {
02955 int fd;
02956
02957
02958 fd=tcpconn->s;
02959 #ifdef USE_TLS
02960 if (tcpconn->type==PROTO_TLS)
02961 tls_close(tcpconn, fd);
02962 #endif
02963 #ifdef TCP_FD_CACHE
02964 if (likely(cfg_get(tcp, tcp_cfg, fd_cache))) shutdown(fd, SHUT_RDWR);
02965 #endif
02966 if (unlikely(tcp_safe_close(fd)<0))
02967 LOG(L_ERR, "ERROR: tcpconn_close_main_fd(%p): %s "
02968 "close(%d) failed (flags 0x%x): %s (%d)\n", tcpconn,
02969 su2a(&tcpconn->rcv.src_su, sizeof(tcpconn->rcv.src_su)),
02970 fd, tcpconn->flags, strerror(errno), errno);
02971 tcpconn->s=-1;
02972 }
02973
02974
02975
02976
02977
02978
02979
02980 inline static int tcpconn_chld_put(struct tcp_connection* tcpconn)
02981 {
02982 if (unlikely(atomic_dec_and_test(&tcpconn->refcnt))){
02983 DBG("tcpconn_chld_put: destroying connection %p (%d, %d) "
02984 "flags %04x\n", tcpconn, tcpconn->id,
02985 tcpconn->s, tcpconn->flags);
02986
02987 membar_read_atomic_op();
02988 if (unlikely(!(tcpconn->flags & F_CONN_FD_CLOSED) ||
02989 (tcpconn->flags &
02990 (F_CONN_HASHED|F_CONN_MAIN_TIMER|
02991 F_CONN_READ_W|F_CONN_WRITE_W)) )){
02992 LOG(L_CRIT, "BUG: tcpconn_chld_put: %p bad flags = %0x\n",
02993 tcpconn, tcpconn->flags);
02994 abort();
02995 }
02996 _tcpconn_free(tcpconn);
02997 return 1;
02998 }
02999 return 0;
03000 }
03001
03002
03003
03004
03005
03006
03007 inline static void tcpconn_destroy(struct tcp_connection* tcpconn)
03008 {
03009 DBG("tcpconn_destroy: destroying connection %p (%d, %d) "
03010 "flags %04x\n", tcpconn, tcpconn->id,
03011 tcpconn->s, tcpconn->flags);
03012 if (unlikely(tcpconn->flags & F_CONN_HASHED)){
03013 LOG(L_CRIT, "BUG: tcpconn_destroy: called with hashed"
03014 " connection (%p)\n", tcpconn);
03015
03016 if (likely(tcpconn->flags & F_CONN_MAIN_TIMER))
03017 local_timer_del(&tcp_main_ltimer, &tcpconn->timer);
03018 TCPCONN_LOCK;
03019 _tcpconn_detach(tcpconn);
03020 tcpconn->flags &= ~(F_CONN_HASHED|F_CONN_MAIN_TIMER);
03021 TCPCONN_UNLOCK;
03022 }
03023 if (likely(!(tcpconn->flags & F_CONN_FD_CLOSED))){
03024 tcpconn_close_main_fd(tcpconn);
03025 tcpconn->flags|=F_CONN_FD_CLOSED;
03026 (*tcp_connections_no)--;
03027 if (unlikely(tcpconn->type==PROTO_TLS))
03028 (*tls_connections_no)--;
03029 }
03030 _tcpconn_free(tcpconn);
03031 }
03032
03033
03034
03035
03036
03037
03038
03039
03040
03041
03042
03043
03044
03045
03046 inline static int tcpconn_put_destroy(struct tcp_connection* tcpconn)
03047 {
03048 if (unlikely((tcpconn->flags &
03049 (F_CONN_WRITE_W|F_CONN_HASHED|F_CONN_MAIN_TIMER|F_CONN_READ_W)) )){
03050
03051 if (unlikely(tcpconn->flags & F_CONN_HASHED)){
03052 LOG(L_CRIT, "BUG: tcpconn_destroy: called with hashed and/or"
03053 "on timer connection (%p), flags = %0x\n",
03054 tcpconn, tcpconn->flags);
03055
03056 if (likely(tcpconn->flags & F_CONN_MAIN_TIMER))
03057 local_timer_del(&tcp_main_ltimer, &tcpconn->timer);
03058 TCPCONN_LOCK;
03059 _tcpconn_detach(tcpconn);
03060 tcpconn->flags &= ~(F_CONN_HASHED|F_CONN_MAIN_TIMER);
03061 TCPCONN_UNLOCK;
03062 }else{
03063 LOG(L_CRIT, "BUG: tcpconn_put_destroy: %p flags = %0x\n",
03064 tcpconn, tcpconn->flags);
03065 }
03066 }
03067 tcpconn->state=S_CONN_BAD;
03068
03069 tcpconn->timeout=get_ticks_raw();
03070
03071 if (likely(!(tcpconn->flags & F_CONN_FD_CLOSED))){
03072 tcpconn_close_main_fd(tcpconn);
03073 tcpconn->flags|=F_CONN_FD_CLOSED;
03074 (*tcp_connections_no)--;
03075 if (unlikely(tcpconn->type==PROTO_TLS))
03076 (*tls_connections_no)--;
03077 }
03078
03079
03080
03081
03082 if (unlikely(mb_atomic_dec_and_test(&tcpconn->refcnt))){
03083 _tcpconn_free(tcpconn);
03084 return 1;
03085 }
03086 return 0;
03087 }
03088
03089
03090
03091
03092
03093
03094
03095
03096
03097
03098 inline static int tcpconn_try_unhash(struct tcp_connection* tcpconn)
03099 {
03100 if (likely(tcpconn->flags & F_CONN_HASHED)){
03101 tcpconn->state=S_CONN_BAD;
03102 if (likely(tcpconn->flags & F_CONN_MAIN_TIMER)){
03103 local_timer_del(&tcp_main_ltimer, &tcpconn->timer);
03104 tcpconn->flags&=~F_CONN_MAIN_TIMER;
03105 }else
03106
03107 tcpconn->timeout=get_ticks_raw();
03108 TCPCONN_LOCK;
03109 if (tcpconn->flags & F_CONN_HASHED){
03110 tcpconn->flags&=~F_CONN_HASHED;
03111 _tcpconn_detach(tcpconn);
03112 TCPCONN_UNLOCK;
03113 }else{
03114
03115 TCPCONN_UNLOCK;
03116 return 0;
03117 }
03118 #ifdef TCP_ASYNC
03119
03120 if (unlikely(_wbufq_non_empty(tcpconn))){
03121 lock_get(&tcpconn->write_lock);
03122
03123 if (likely(_wbufq_non_empty(tcpconn)))
03124 _wbufq_destroy(&tcpconn->wbuf_q);
03125 lock_release(&tcpconn->write_lock);
03126 }
03127 #endif
03128 return 1;
03129 }
03130 return 0;
03131 }
03132
03133
03134
03135 #ifdef SEND_FD_QUEUE
03136 struct send_fd_info{
03137 struct tcp_connection* tcp_conn;
03138 ticks_t expire;
03139 int unix_sock;
03140 unsigned int retries;
03141 };
03142
03143 struct tcp_send_fd_q{
03144 struct send_fd_info* data;
03145 struct send_fd_info* crt;
03146 struct send_fd_info* end;
03147 };
03148
03149
03150 static struct tcp_send_fd_q send2child_q;
03151
03152
03153
03154 static int send_fd_queue_init(struct tcp_send_fd_q *q, unsigned int size)
03155 {
03156 q->data=pkg_malloc(size*sizeof(struct send_fd_info));
03157 if (q->data==0){
03158 LOG(L_ERR, "ERROR: send_fd_queue_init: out of memory\n");
03159 return -1;
03160 }
03161 q->crt=&q->data[0];
03162 q->end=&q->data[size];
03163 return 0;
03164 }
03165
03166 static void send_fd_queue_destroy(struct tcp_send_fd_q *q)
03167 {
03168 if (q->data){
03169 pkg_free(q->data);
03170 q->data=0;
03171 q->crt=q->end=0;
03172 }
03173 }
03174
03175
03176
03177 static int init_send_fd_queues(void)
03178 {
03179 if (send_fd_queue_init(&send2child_q, SEND_FD_QUEUE_SIZE)!=0)
03180 goto error;
03181 return 0;
03182 error:
03183 LOG(L_ERR, "ERROR: init_send_fd_queues: init failed\n");
03184 return -1;
03185 }
03186
03187
03188
03189 static void destroy_send_fd_queues(void)
03190 {
03191 send_fd_queue_destroy(&send2child_q);
03192 }
03193
03194
03195
03196
03197 inline static int send_fd_queue_add( struct tcp_send_fd_q* q,
03198 int unix_sock,
03199 struct tcp_connection *t)
03200 {
03201 struct send_fd_info* tmp;
03202 unsigned long new_size;
03203
03204 if (q->crt>=q->end){
03205 new_size=q->end-&q->data[0];
03206 if (new_size< MAX_SEND_FD_QUEUE_SIZE/2){
03207 new_size*=2;
03208 }else new_size=MAX_SEND_FD_QUEUE_SIZE;
03209 if (unlikely(q->crt>=&q->data[new_size])){
03210 LOG(L_ERR, "ERROR: send_fd_queue_add: queue full: %ld/%ld\n",
03211 (long)(q->crt-&q->data[0]-1), new_size);
03212 goto error;
03213 }
03214 LOG(L_CRIT, "INFO: send_fd_queue: queue full: %ld, extending to %ld\n",
03215 (long)(q->end-&q->data[0]), new_size);
03216 tmp=pkg_realloc(q->data, new_size*sizeof(struct send_fd_info));
03217 if (unlikely(tmp==0)){
03218 LOG(L_ERR, "ERROR: send_fd_queue_add: out of memory\n");
03219 goto error;
03220 }
03221 q->crt=(q->crt-&q->data[0])+tmp;
03222 q->data=tmp;
03223 q->end=&q->data[new_size];
03224 }
03225 q->crt->tcp_conn=t;
03226 q->crt->unix_sock=unix_sock;
03227 q->crt->expire=get_ticks_raw()+SEND_FD_QUEUE_TIMEOUT;
03228 q->crt->retries=0;
03229 q->crt++;
03230 return 0;
03231 error:
03232 return -1;
03233 }
03234
03235
03236
03237 inline static void send_fd_queue_run(struct tcp_send_fd_q* q)
03238 {
03239 struct send_fd_info* p;
03240 struct send_fd_info* t;
03241
03242 for (p=t=&q->data[0]; p<q->crt; p++){
03243 if (unlikely(p->tcp_conn->state == S_CONN_BAD ||
03244 p->tcp_conn->flags & F_CONN_FD_CLOSED ||
03245 p->tcp_conn->s ==-1)) {
03246
03247 goto rm_con;
03248 }
03249 if (unlikely(send_fd(p->unix_sock, &(p->tcp_conn),
03250 sizeof(struct tcp_connection*), p->tcp_conn->s)<=0)){
03251 if ( ((errno==EAGAIN)||(errno==EWOULDBLOCK)) &&
03252 ((s_ticks_t)(p->expire-get_ticks_raw())>0)){
03253
03254 *t=*p;
03255 t->retries++;
03256 t++;
03257 }else{
03258 LOG(L_ERR, "ERROR: run_send_fd_queue: send_fd failed"
03259 " on socket %d , queue entry %ld, retries %d,"
03260 " connection %p, tcp socket %d, errno=%d (%s) \n",
03261 p->unix_sock, (long)(p-&q->data[0]), p->retries,
03262 p->tcp_conn, p->tcp_conn->s, errno,
03263 strerror(errno));
03264 rm_con:
03265 #ifdef TCP_ASYNC
03266
03267
03268
03269 if (p->tcp_conn->flags & F_CONN_WRITE_W){
03270 io_watch_del(&io_h, p->tcp_conn->s, -1, IO_FD_CLOSING);
03271 p->tcp_conn->flags &=~F_CONN_WRITE_W;
03272 }
03273 #endif
03274 p->tcp_conn->flags &= ~F_CONN_READER;
03275 if (likely(tcpconn_try_unhash(p->tcp_conn)))
03276 tcpconn_put(p->tcp_conn);
03277 tcpconn_put_destroy(p->tcp_conn);
03278 }
03279 }
03280 }
03281 q->crt=t;
03282 }
03283 #else
03284 #define send_fd_queue_run(q)
03285 #endif
03286
03287
03288
03289
03290
03291
03292 int _tcpconn_write_nb(int fd, struct tcp_connection* c,
03293 const char* buf, int len)
03294 {
03295 int n;
03296
03297 again:
03298 n=send(fd, buf, len,
03299 #ifdef HAVE_MSG_NOSIGNAL
03300 MSG_NOSIGNAL
03301 #else
03302 0
03303 #endif
03304 );
03305 if (unlikely(n<0)){
03306 if (errno==EINTR) goto again;
03307 }
03308 return n;
03309 }
03310
03311
03312
03313
03314
03315
03316
03317
03318
03319
03320
03321
03322 inline static int handle_tcp_child(struct tcp_child* tcp_c, int fd_i)
03323 {
03324 struct tcp_connection* tcpconn;
03325 long response[2];
03326 int cmd;
03327 int bytes;
03328 int n;
03329 ticks_t t;
03330 ticks_t crt_timeout;
03331 ticks_t con_lifetime;
03332
03333 if (unlikely(tcp_c->unix_sock<=0)){
03334
03335 LOG(L_CRIT, "BUG: handle_tcp_child: fd %d for %d "
03336 "(pid %ld, ser no %d)\n", tcp_c->unix_sock,
03337 (int)(tcp_c-&tcp_children[0]), (long)tcp_c->pid,
03338 tcp_c->proc_no);
03339 goto error;
03340 }
03341
03342
03343 bytes=recv_all(tcp_c->unix_sock, response, sizeof(response), MSG_DONTWAIT);
03344 if (unlikely(bytes<(int)sizeof(response))){
03345 if (bytes==0){
03346
03347 DBG("DBG: handle_tcp_child: dead tcp child %d (pid %ld, no %d)"
03348 " (shutting down?)\n", (int)(tcp_c-&tcp_children[0]),
03349 (long)tcp_c->pid, tcp_c->proc_no );
03350
03351 io_watch_del(&io_h, tcp_c->unix_sock, fd_i, 0);
03352 goto error;
03353 }else if (bytes<0){
03354
03355
03356 if ((errno!=EAGAIN) && (errno!=EWOULDBLOCK)){
03357 LOG(L_CRIT, "ERROR: handle_tcp_child: read from tcp child %ld "
03358 " (pid %ld, no %d) %s [%d]\n",
03359 (long)(tcp_c-&tcp_children[0]), (long)tcp_c->pid,
03360 tcp_c->proc_no, strerror(errno), errno );
03361 }else{
03362 bytes=0;
03363 }
03364
03365 goto end;
03366 }else{
03367
03368 LOG(L_CRIT, "BUG: handle_tcp_child: too few bytes received (%d)\n",
03369 bytes );
03370 bytes=0;
03371
03372
03373 goto end;
03374 }
03375 }
03376
03377 DBG("handle_tcp_child: reader response= %lx, %ld from %d \n",
03378 response[0], response[1], (int)(tcp_c-&tcp_children[0]));
03379 cmd=response[1];
03380 tcpconn=(struct tcp_connection*)response[0];
03381 if (unlikely(tcpconn==0)){
03382
03383 LOG(L_CRIT, "BUG: handle_tcp_child: null tcpconn pointer received"
03384 " from tcp child %d (pid %ld): %lx, %lx\n",
03385 (int)(tcp_c-&tcp_children[0]), (long)tcp_c->pid,
03386 response[0], response[1]) ;
03387 goto end;
03388 }
03389 switch(cmd){
03390 case CONN_RELEASE:
03391 tcp_c->busy--;
03392 if (unlikely(tcpconn_put(tcpconn))){
03393
03394
03395
03396 tcpconn_destroy(tcpconn);
03397 break;
03398 }
03399 if (unlikely(tcpconn->state==S_CONN_BAD)){
03400 if (tcpconn_try_unhash(tcpconn)) {
03401 #ifdef TCP_ASYNC
03402 if (unlikely(tcpconn->flags & F_CONN_WRITE_W)){
03403 io_watch_del(&io_h, tcpconn->s, -1, IO_FD_CLOSING);
03404 tcpconn->flags &= ~F_CONN_WRITE_W;
03405 }
03406 #endif
03407 tcpconn_put_destroy(tcpconn);
03408 }
03409 #ifdef TCP_ASYNC
03410 else if (unlikely(tcpconn->flags & F_CONN_WRITE_W)){
03411
03412
03413 BUG("unhashed connection watched for write\n");
03414 io_watch_del(&io_h, tcpconn->s, -1, 0);
03415 tcpconn->flags &= ~F_CONN_WRITE_W;
03416 }
03417 #endif
03418 break;
03419 }
03420
03421 t=get_ticks_raw();
03422 con_lifetime=cfg_get(tcp, tcp_cfg, con_lifetime);
03423 tcpconn->timeout=t+con_lifetime;
03424 crt_timeout=con_lifetime;
03425 #ifdef TCP_ASYNC
03426 if (unlikely(cfg_get(tcp, tcp_cfg, async) &&
03427 _wbufq_non_empty(tcpconn) )){
03428 if (unlikely(TICKS_GE(t, tcpconn->wbuf_q.wr_timeout))){
03429 DBG("handle_tcp_child: wr. timeout on CONN_RELEASE for %p "
03430 "refcnt= %d\n", tcpconn,
03431 atomic_get(&tcpconn->refcnt));
03432
03433 if (unlikely(tcpconn->state==S_CONN_CONNECT)){
03434 #ifdef USE_DST_BLACKLIST
03435 dst_blacklist_su( BLST_ERR_CONNECT,
03436 tcpconn->rcv.proto,
03437 &tcpconn->rcv.src_su,
03438 &tcpconn->send_flags, 0);
03439 #endif
03440 TCP_EV_CONNECT_TIMEOUT(0, TCP_LADDR(tcpconn),
03441 TCP_LPORT(tcpconn), TCP_PSU(tcpconn),
03442 TCP_PROTO(tcpconn));
03443 TCP_STATS_CONNECT_FAILED();
03444 }else{
03445 #ifdef USE_DST_BLACKLIST
03446 dst_blacklist_su( BLST_ERR_SEND,
03447 tcpconn->rcv.proto,
03448 &tcpconn->rcv.src_su,
03449 &tcpconn->send_flags, 0);
03450 #endif
03451 TCP_EV_SEND_TIMEOUT(0, &tcpconn->rcv);
03452 TCP_STATS_SEND_TIMEOUT();
03453 }
03454 if (tcpconn_try_unhash(tcpconn)) {
03455 if (unlikely(tcpconn->flags & F_CONN_WRITE_W)){
03456 io_watch_del(&io_h, tcpconn->s, -1, IO_FD_CLOSING);
03457 tcpconn->flags&=~F_CONN_WRITE_W;
03458 }
03459 tcpconn_put_destroy(tcpconn);
03460 } else if (unlikely(tcpconn->flags & F_CONN_WRITE_W)){
03461 BUG("unhashed connection watched for write\n");
03462 io_watch_del(&io_h, tcpconn->s, -1, 0);
03463 tcpconn->flags&=~F_CONN_WRITE_W;
03464 }
03465 break;
03466 }else{
03467 crt_timeout=MIN_unsigned(con_lifetime,
03468 tcpconn->wbuf_q.wr_timeout-t);
03469 }
03470 }
03471 #endif
03472
03473 tcpconn->timer.f=tcpconn_main_timeout;
03474 local_timer_reinit(&tcpconn->timer);
03475 local_timer_add(&tcp_main_ltimer, &tcpconn->timer, crt_timeout, t);
03476
03477 tcpconn->flags|=(F_CONN_MAIN_TIMER|F_CONN_READ_W|F_CONN_WANTS_RD);
03478 tcpconn->flags&=~(F_CONN_READER|F_CONN_OOB_DATA);
03479 #ifdef TCP_ASYNC
03480 if (unlikely(tcpconn->flags & F_CONN_WRITE_W))
03481 n=io_watch_chg(&io_h, tcpconn->s, POLLIN| POLLOUT, -1);
03482 else
03483 #endif
03484 n=io_watch_add(&io_h, tcpconn->s, POLLIN, F_TCPCONN, tcpconn);
03485 if (unlikely(n<0)){
03486 LOG(L_CRIT, "ERROR: tcp_main: handle_tcp_child: failed to add"
03487 " new socket to the fd list\n");
03488 tcpconn->flags&=~F_CONN_READ_W;
03489 if (tcpconn_try_unhash(tcpconn)) {
03490 #ifdef TCP_ASYNC
03491 if (unlikely(tcpconn->flags & F_CONN_WRITE_W)){
03492 io_watch_del(&io_h, tcpconn->s, -1, IO_FD_CLOSING);
03493 tcpconn->flags&=~F_CONN_WRITE_W;
03494 }
03495 #endif
03496 tcpconn_put_destroy(tcpconn);
03497 }
03498 #ifdef TCP_ASYNC
03499 else if (unlikely(tcpconn->flags & F_CONN_WRITE_W)) {
03500 BUG("unhashed connection watched for write\n");
03501 io_watch_del(&io_h, tcpconn->s, -1, 0);
03502 tcpconn->flags&=~F_CONN_WRITE_W;
03503 }
03504 #endif
03505 break;
03506 }
03507 DBG("handle_tcp_child: CONN_RELEASE %p refcnt= %d\n",
03508 tcpconn, atomic_get(&tcpconn->refcnt));
03509 break;
03510 case CONN_ERROR:
03511 case CONN_DESTROY:
03512 case CONN_EOF:
03513
03514 tcp_c->busy--;
03515
03516
03517
03518
03519 #ifdef TCP_ASYNC
03520 if ((tcpconn->flags & F_CONN_WRITE_W) && (tcpconn->s!=-1)){
03521 io_watch_del(&io_h, tcpconn->s, -1, IO_FD_CLOSING);
03522 tcpconn->flags&=~F_CONN_WRITE_W;
03523 }
03524 #endif
03525 if (tcpconn_try_unhash(tcpconn))
03526 tcpconn_put(tcpconn);
03527 tcpconn_put_destroy(tcpconn);
03528 break;
03529 default:
03530 LOG(L_CRIT, "BUG: handle_tcp_child: unknown cmd %d"
03531 " from tcp reader %d\n",
03532 cmd, (int)(tcp_c-&tcp_children[0]));
03533 }
03534 end:
03535 return bytes;
03536 error:
03537 return -1;
03538 }
03539
03540
03541
03542
03543
03544
03545
03546
03547
03548
03549
03550
03551
03552
03553
03554
03555 inline static int handle_ser_child(struct process_table* p, int fd_i)
03556 {
03557 struct tcp_connection* tcpconn;
03558 struct tcp_connection* tmp;
03559 long response[2];
03560 int cmd;
03561 int bytes;
03562 int ret;
03563 int fd;
03564 int flags;
03565 ticks_t t;
03566 ticks_t con_lifetime;
03567 #ifdef TCP_ASYNC
03568 ticks_t nxt_timeout;
03569 #endif
03570
03571 ret=-1;
03572 if (unlikely(p->unix_sock<=0)){
03573
03574 LOG(L_CRIT, "BUG: handle_ser_child: fd %d for %d "
03575 "(pid %d)\n", p->unix_sock, (int)(p-&pt[0]), p->pid);
03576 goto error;
03577 }
03578
03579
03580
03581 bytes=receive_fd(p->unix_sock, response, sizeof(response), &fd,
03582 MSG_DONTWAIT);
03583 if (unlikely(bytes<(int)sizeof(response))){
03584
03585 if (bytes==0){
03586
03587 DBG("DBG: handle_ser_child: dead child %d, pid %d"
03588 " (shutting down?)\n", (int)(p-&pt[0]), p->pid);
03589
03590 io_watch_del(&io_h, p->unix_sock, fd_i, 0);
03591 goto error;
03592 }else if (bytes<0){
03593
03594
03595 if ((errno!=EAGAIN) && (errno!=EWOULDBLOCK)){
03596 LOG(L_CRIT, "ERROR: handle_ser_child: read from child %d "
03597 "(pid %d): %s [%d]\n", (int)(p-&pt[0]), p->pid,
03598 strerror(errno), errno);
03599 ret=-1;
03600 }else{
03601 ret=0;
03602 }
03603
03604 goto end;
03605 }else{
03606
03607 LOG(L_CRIT, "BUG: handle_ser_child: too few bytes received (%d)\n",
03608 bytes );
03609 ret=0;
03610
03611
03612 goto end;
03613 }
03614 }
03615 ret=1;
03616 DBG("handle_ser_child: read response= %lx, %ld, fd %d from %d (%d)\n",
03617 response[0], response[1], fd, (int)(p-&pt[0]), p->pid);
03618 cmd=response[1];
03619 tcpconn=(struct tcp_connection*)response[0];
03620 if (unlikely(tcpconn==0)){
03621 LOG(L_CRIT, "BUG: handle_ser_child: null tcpconn pointer received"
03622 " from child %d (pid %d): %lx, %lx\n",
03623 (int)(p-&pt[0]), p->pid, response[0], response[1]) ;
03624 goto end;
03625 }
03626 switch(cmd){
03627 case CONN_ERROR:
03628 LOG(L_ERR, "handle_ser_child: ERROR: received CON_ERROR for %p"
03629 " (id %d), refcnt %d, flags 0x%0x\n",
03630 tcpconn, tcpconn->id, atomic_get(&tcpconn->refcnt),
03631 tcpconn->flags);
03632 case CONN_EOF:
03633 #ifdef TCP_CONNECT_WAIT
03634
03635
03636
03637
03638
03639
03640 if (unlikely(tcpconn->flags & F_CONN_PENDING)) {
03641 if (tcpconn_put(tcpconn))
03642 tcpconn_destroy(tcpconn);
03643
03644
03645 break;
03646 }
03647 #endif
03648 if ( tcpconn_try_unhash(tcpconn) )
03649 tcpconn_put(tcpconn);
03650 if ( ((tcpconn->flags & (F_CONN_WRITE_W|F_CONN_READ_W)) ) &&
03651 (tcpconn->s!=-1)){
03652 io_watch_del(&io_h, tcpconn->s, -1, IO_FD_CLOSING);
03653 tcpconn->flags&=~(F_CONN_WRITE_W|F_CONN_READ_W);
03654 }
03655 tcpconn_put_destroy(tcpconn);
03656 break;
03657 case CONN_GET_FD:
03658
03659
03660
03661 if (unlikely(tcpconn->state == S_CONN_BAD ||
03662 (tcpconn->flags & F_CONN_FD_CLOSED) ||
03663 tcpconn->s ==-1)) {
03664
03665
03666
03667 tmp = 0;
03668 if (unlikely(send_all(p->unix_sock, &tmp, sizeof(tmp)) <= 0))
03669 BUG("handle_ser_child: CONN_GET_FD: send_all failed\n");
03670
03671
03672 } else if (unlikely(send_fd(p->unix_sock, &tcpconn,
03673 sizeof(tcpconn), tcpconn->s)<=0)){
03674 LOG(L_ERR, "handle_ser_child: CONN_GET_FD:"
03675 " send_fd failed\n");
03676
03677 tmp = 0;
03678 if (unlikely(send_all(p->unix_sock, &tmp, sizeof(tmp)) <= 0))
03679 BUG("handle_ser_child: CONN_GET_FD:"
03680 " send_fd send_all fallback failed\n");
03681 }
03682 break;
03683 case CONN_NEW:
03684
03685
03686
03687 if (unlikely(fd==-1)){
03688 LOG(L_CRIT, "BUG: handle_ser_child: CONN_NEW:"
03689 " no fd received\n");
03690 tcpconn->flags|=F_CONN_FD_CLOSED;
03691 tcpconn_put_destroy(tcpconn);
03692 break;
03693 }
03694 (*tcp_connections_no)++;
03695 if (unlikely(tcpconn->type==PROTO_TLS))
03696 (*tls_connections_no)++;
03697 tcpconn->s=fd;
03698
03699 tcpconn_add(tcpconn);
03700
03701 t=get_ticks_raw();
03702 con_lifetime=cfg_get(tcp, tcp_cfg, con_lifetime);
03703 tcpconn->timeout=t+con_lifetime;
03704
03705
03706 local_timer_add(&tcp_main_ltimer, &tcpconn->timer,
03707 con_lifetime, t);
03708 tcpconn->flags|=(F_CONN_MAIN_TIMER|F_CONN_READ_W|F_CONN_WANTS_RD)
03709 #ifdef TCP_ASYNC
03710
03711
03712
03713 | (((int)!(tcpconn->flags & F_CONN_WANTS_WR)-1)&
03714 F_CONN_WRITE_W)
03715 #endif
03716 ;
03717 tcpconn->flags&=~F_CONN_FD_CLOSED;
03718 flags=POLLIN
03719 #ifdef TCP_ASYNC
03720
03721
03722
03723 | (((int)!(tcpconn->flags & F_CONN_WANTS_WR)-1) & POLLOUT)
03724 #endif
03725 ;
03726 if (unlikely(
03727 io_watch_add(&io_h, tcpconn->s, flags,
03728 F_TCPCONN, tcpconn)<0)){
03729 LOG(L_CRIT, "ERROR: tcp_main: handle_ser_child: failed to add"
03730 " new socket to the fd list\n");
03731 tcpconn->flags&=~(F_CONN_WRITE_W|F_CONN_READ_W);
03732 tcpconn_try_unhash(tcpconn);
03733 tcpconn_put_destroy(tcpconn);
03734 }
03735 break;
03736 #ifdef TCP_ASYNC
03737 case CONN_QUEUED_WRITE:
03738
03739
03740
03741
03742
03743
03744
03745 if (unlikely(tcpconn_put(tcpconn))){
03746 tcpconn_destroy(tcpconn);
03747 break;
03748 }
03749 if (unlikely((tcpconn->state==S_CONN_BAD) ||
03750 !(tcpconn->flags & F_CONN_HASHED) ))
03751
03752 break;
03753 if (!(tcpconn->flags & F_CONN_WANTS_WR)){
03754 tcpconn->flags|=F_CONN_WANTS_WR;
03755 t=get_ticks_raw();
03756 if (likely((tcpconn->flags & F_CONN_MAIN_TIMER) &&
03757 (TICKS_LT(tcpconn->wbuf_q.wr_timeout, tcpconn->timeout)) &&
03758 TICKS_LT(t, tcpconn->wbuf_q.wr_timeout) )){
03759
03760
03761 local_timer_del(&tcp_main_ltimer, &tcpconn->timer);
03762 local_timer_reinit(&tcpconn->timer);
03763 local_timer_add(&tcp_main_ltimer, &tcpconn->timer,
03764 tcpconn->wbuf_q.wr_timeout-t, t);
03765 DBG("tcp_main: handle_ser_child: CONN_QUEUED_WRITE; %p "
03766 "timeout adjusted to %d s\n", tcpconn,
03767 TICKS_TO_S(tcpconn->wbuf_q.wr_timeout-t));
03768 }
03769 if (!(tcpconn->flags & F_CONN_WRITE_W)){
03770 tcpconn->flags|=F_CONN_WRITE_W;
03771 if (!(tcpconn->flags & F_CONN_READ_W)){
03772 if (unlikely(io_watch_add(&io_h, tcpconn->s, POLLOUT,
03773 F_TCPCONN, tcpconn)<0)){
03774 LOG(L_CRIT, "ERROR: tcp_main: handle_ser_child:"
03775 " failed to enable write watch on"
03776 " socket\n");
03777 if (tcpconn_try_unhash(tcpconn))
03778 tcpconn_put_destroy(tcpconn);
03779 break;
03780 }
03781 }else{
03782 if (unlikely(io_watch_chg(&io_h, tcpconn->s,
03783 POLLIN|POLLOUT, -1)<0)){
03784 LOG(L_CRIT, "ERROR: tcp_main: handle_ser_child:"
03785 " failed to change socket watch events\n");
03786 if (tcpconn_try_unhash(tcpconn)) {
03787 io_watch_del(&io_h, tcpconn->s, -1,
03788 IO_FD_CLOSING);
03789 tcpconn->flags&=~F_CONN_READ_W;
03790 tcpconn_put_destroy(tcpconn);
03791 } else {
03792 BUG("unhashed connection watched for IO\n");
03793 io_watch_del(&io_h, tcpconn->s, -1, 0);
03794 tcpconn->flags&=~F_CONN_READ_W;
03795 }
03796 break;
03797 }
03798 }
03799 }
03800 }else{
03801 LOG(L_WARN, "tcp_main: handler_ser_child: connection %p"
03802 " already watched for write\n", tcpconn);
03803 }
03804 break;
03805 #ifdef TCP_CONNECT_WAIT
03806 case CONN_NEW_COMPLETE:
03807 case CONN_NEW_PENDING_WRITE:
03808
03809
03810
03811
03812
03813 tcpconn->flags&=~(F_CONN_PENDING|F_CONN_FD_CLOSED);
03814 if (unlikely((tcpconn->state==S_CONN_BAD) || (fd==-1))){
03815 if (unlikely(fd==-1))
03816 LOG(L_CRIT, "BUG: handle_ser_child: CONN_NEW_COMPLETE:"
03817 " no fd received\n");
03818 else
03819 LOG(L_WARN, "WARNING: handle_ser_child: CONN_NEW_COMPLETE:"
03820 " received connection with error\n");
03821 tcpconn->flags|=F_CONN_FD_CLOSED;
03822 tcpconn->state=S_CONN_BAD;
03823 tcpconn_try_unhash(tcpconn);
03824 tcpconn_put_destroy(tcpconn);
03825 break;
03826 }
03827 (*tcp_connections_no)++;
03828 if (unlikely(tcpconn->type==PROTO_TLS))
03829 (*tls_connections_no)++;
03830 tcpconn->s=fd;
03831
03832 t=get_ticks_raw();
03833 con_lifetime=cfg_get(tcp, tcp_cfg, con_lifetime);
03834 tcpconn->timeout=t+con_lifetime;
03835 nxt_timeout=con_lifetime;
03836 if (unlikely(cmd==CONN_NEW_COMPLETE)){
03837
03838 lock_get(&tcpconn->write_lock);
03839
03840 flags=(_wbufq_empty(tcpconn)-1)&POLLOUT;
03841 lock_release(&tcpconn->write_lock);
03842 if (flags){
03843 if (TICKS_LT(tcpconn->wbuf_q.wr_timeout, tcpconn->timeout)
03844 && TICKS_LT(t, tcpconn->wbuf_q.wr_timeout))
03845 nxt_timeout=tcpconn->wbuf_q.wr_timeout-t;
03846 tcpconn->flags|=F_CONN_WRITE_W|F_CONN_WANTS_WR;
03847 }
03848
03849
03850 local_timer_add(&tcp_main_ltimer, &tcpconn->timer, nxt_timeout,
03851 t);
03852 tcpconn->flags|=F_CONN_MAIN_TIMER|F_CONN_READ_W|
03853 F_CONN_WANTS_RD;
03854 }else{
03855
03856
03857 flags=POLLOUT;
03858 if (TICKS_LT(tcpconn->wbuf_q.wr_timeout, tcpconn->timeout)
03859 && TICKS_LT(t, tcpconn->wbuf_q.wr_timeout))
03860 nxt_timeout=tcpconn->wbuf_q.wr_timeout-t;
03861
03862
03863 local_timer_add(&tcp_main_ltimer, &tcpconn->timer, nxt_timeout,
03864 t);
03865 tcpconn->flags|=F_CONN_MAIN_TIMER|F_CONN_READ_W|
03866 F_CONN_WANTS_RD |
03867 F_CONN_WRITE_W|F_CONN_WANTS_WR;
03868 }
03869 flags|=POLLIN;
03870 if (unlikely(
03871 io_watch_add(&io_h, tcpconn->s, flags,
03872 F_TCPCONN, tcpconn)<0)){
03873 LOG(L_CRIT, "ERROR: tcp_main: handle_ser_child: failed to add"
03874 " new socket to the fd list\n");
03875 tcpconn->flags&=~(F_CONN_WRITE_W|F_CONN_READ_W);
03876 tcpconn_try_unhash(tcpconn);
03877 tcpconn_put_destroy(tcpconn);
03878 }
03879 break;
03880 #endif
03881 #endif
03882 default:
03883 LOG(L_CRIT, "BUG: handle_ser_child: unknown cmd %d\n", cmd);
03884 }
03885 end:
03886 return ret;
03887 error:
03888 return -1;
03889 }
03890
03891
03892
03893
03894 inline static int send2child(struct tcp_connection* tcpconn)
03895 {
03896 int i;
03897 int min_busy;
03898 int idx;
03899 int wfirst;
03900 int wlast;
03901 static int crt=0;
03902 int last;
03903
03904 if(likely(tcp_sockets_gworkers==0)) {
03905
03906
03907 min_busy=tcp_children[0].busy;
03908 idx=0;
03909 last=crt+tcp_children_no;
03910 for (; crt<last; crt++){
03911 i=crt%tcp_children_no;
03912 if (!tcp_children[i].busy){
03913 idx=i;
03914 min_busy=0;
03915 break;
03916 }else if (min_busy>tcp_children[i].busy){
03917 min_busy=tcp_children[i].busy;
03918 idx=i;
03919 }
03920 }
03921 crt=idx+1;
03922 } else {
03923
03924
03925
03926 if(tcpconn->rcv.bind_address->workers>0) {
03927 wfirst = tcpconn->rcv.bind_address->workers_tcpidx;
03928 wlast = wfirst + tcpconn->rcv.bind_address->workers;
03929 LM_DBG("===== checking per-socket specific workers (%d/%d..%d/%d) [%s]\n",
03930 tcp_children[wfirst].pid, tcp_children[wfirst].proc_no,
03931 tcp_children[wlast-1].pid, tcp_children[wlast-1].proc_no,
03932 tcpconn->rcv.bind_address->sock_str.s);
03933 } else {
03934 wfirst = 0;
03935 wlast = tcp_sockets_gworkers - 1;
03936 LM_DBG("+++++ checking per-socket generic workers (%d/%d..%d/%d) [%s]\n",
03937 tcp_children[wfirst].pid, tcp_children[wfirst].proc_no,
03938 tcp_children[wlast-1].pid, tcp_children[wlast-1].proc_no,
03939 tcpconn->rcv.bind_address->sock_str.s);
03940 }
03941 idx = wfirst;
03942 min_busy = tcp_children[idx].busy;
03943 for(i=wfirst; i<wlast; i++) {
03944 if (!tcp_children[i].busy){
03945 idx=i;
03946 min_busy=0;
03947 break;
03948 } else {
03949 if (min_busy>tcp_children[i].busy) {
03950 min_busy=tcp_children[i].busy;
03951 idx=i;
03952 }
03953 }
03954 }
03955 }
03956
03957 tcp_children[idx].busy++;
03958 tcp_children[idx].n_reqs++;
03959 if (unlikely(min_busy)){
03960 DBG("WARNING: send2child: no free tcp receiver, "
03961 " connection passed to the least busy one (%d)\n",
03962 min_busy);
03963 }
03964 LM_DBG("selected tcp worker %d %d(%ld) for activity on [%s], %p\n",
03965 idx, tcp_children[idx].proc_no, (long)tcp_children[idx].pid,
03966 tcpconn->rcv.bind_address->sock_str.s, tcpconn);
03967
03968
03969
03970
03971
03972 while(unlikely((tcpconn->state != S_CONN_BAD) &&
03973 (handle_ser_child(&pt[tcp_children[idx].proc_no], -1)>0)));
03974
03975 while(unlikely((tcpconn->state != S_CONN_BAD &&
03976 (handle_tcp_child(&tcp_children[idx], -1)>0))));
03977
03978
03979
03980
03981
03982
03983 if (unlikely(tcpconn->state == S_CONN_BAD ||
03984 (tcpconn->flags & F_CONN_FD_CLOSED)))
03985 return -1;
03986 #ifdef SEND_FD_QUEUE
03987
03988 if (unlikely(send_fd(tcp_children[idx].unix_sock, &tcpconn,
03989 sizeof(tcpconn), tcpconn->s)<=0)){
03990 if ((errno==EAGAIN)||(errno==EWOULDBLOCK)){
03991
03992 LOG(L_CRIT, "INFO: tcp child %d, socket %d: queue full,"
03993 " %d requests queued (total handled %d)\n",
03994 idx, tcp_children[idx].unix_sock, min_busy,
03995 tcp_children[idx].n_reqs-1);
03996 if (send_fd_queue_add(&send2child_q, tcp_children[idx].unix_sock,
03997 tcpconn)!=0){
03998 LOG(L_ERR, "ERROR: send2child: queue send op. failed\n");
03999 return -1;
04000 }
04001 }else{
04002 LOG(L_ERR, "ERROR: send2child: send_fd failed for %p (flags 0x%0x)"
04003 ", fd %d\n", tcpconn, tcpconn->flags, tcpconn->s);
04004 return -1;
04005 }
04006 }
04007 #else
04008 if (unlikely(send_fd(tcp_children[idx].unix_sock, &tcpconn,
04009 sizeof(tcpconn), tcpconn->s)<=0)){
04010 LOG(L_ERR, "ERROR: send2child: send_fd failed for %p (flags 0x%0x)"
04011 ", fd %d\n", tcpconn, tcpconn->flags, tcpconn->s);
04012 return -1;
04013 }
04014 #endif
04015
04016 return 0;
04017 }
04018
04019
04020
04021
04022
04023
04024
04025
04026
04027
04028 static inline int handle_new_connect(struct socket_info* si)
04029 {
04030 union sockaddr_union su;
04031 union sockaddr_union sock_name;
04032 unsigned sock_name_len;
04033 union sockaddr_union* dst_su;
04034 struct tcp_connection* tcpconn;
04035 socklen_t su_len;
04036 int new_sock;
04037
04038
04039 su_len=sizeof(su);
04040 new_sock=accept(si->socket, &(su.s), &su_len);
04041 if (unlikely(new_sock==-1)){
04042 if ((errno==EAGAIN)||(errno==EWOULDBLOCK))
04043 return 0;
04044 LOG(L_ERR, "WARNING: handle_new_connect: error while accepting"
04045 " connection(%d): %s\n", errno, strerror(errno));
04046 return -1;
04047 }
04048 if (unlikely(*tcp_connections_no>=cfg_get(tcp, tcp_cfg, max_connections))){
04049 LOG(L_ERR, "ERROR: maximum number of connections exceeded: %d/%d\n",
04050 *tcp_connections_no,
04051 cfg_get(tcp, tcp_cfg, max_connections));
04052 tcp_safe_close(new_sock);
04053 TCP_STATS_LOCAL_REJECT();
04054 return 1;
04055 }
04056 if (unlikely(si->proto==PROTO_TLS)) {
04057 if (unlikely(*tls_connections_no>=cfg_get(tcp, tcp_cfg, max_tls_connections))){
04058 LM_ERR("maximum number of tls connections exceeded: %d/%d\n",
04059 *tls_connections_no,
04060 cfg_get(tcp, tcp_cfg, max_tls_connections));
04061 tcp_safe_close(new_sock);
04062 TCP_STATS_LOCAL_REJECT();
04063 return 1;
04064 }
04065 }
04066 if (unlikely(init_sock_opt_accept(new_sock)<0)){
04067 LOG(L_ERR, "ERROR: handle_new_connect: init_sock_opt failed\n");
04068 tcp_safe_close(new_sock);
04069 return 1;
04070 }
04071 (*tcp_connections_no)++;
04072 if (unlikely(si->proto==PROTO_TLS))
04073 (*tls_connections_no)++;
04074
04075
04076
04077
04078
04079
04080
04081
04082
04083
04084 dst_su=&si->su;
04085 if (unlikely(si->flags & SI_IS_ANY)){
04086
04087 sock_name_len=sizeof(sock_name);
04088 if (getsockname(new_sock, &sock_name.s, &sock_name_len)!=0){
04089 LOG(L_ERR, "ERROR: handle_new_connect:"
04090 " getsockname failed: %s(%d)\n",
04091 strerror(errno), errno);
04092
04093 }else{
04094 dst_su=&sock_name;
04095 }
04096 }
04097
04098 tcpconn=tcpconn_new(new_sock, &su, dst_su, si, si->proto, S_CONN_ACCEPT);
04099 if (likely(tcpconn)){
04100 tcpconn->flags|=F_CONN_PASSIVE;
04101 #ifdef TCP_PASS_NEW_CONNECTION_ON_DATA
04102 atomic_set(&tcpconn->refcnt, 1);
04103
04104 tcpconn_add(tcpconn);
04105
04106 local_timer_add(&tcp_main_ltimer, &tcpconn->timer,
04107 cfg_get(tcp, tcp_cfg, con_lifetime),
04108 get_ticks_raw());
04109 tcpconn->flags|=(F_CONN_MAIN_TIMER|F_CONN_READ_W|F_CONN_WANTS_RD);
04110 if (unlikely(io_watch_add(&io_h, tcpconn->s, POLLIN,
04111 F_TCPCONN, tcpconn)<0)){
04112 LOG(L_CRIT, "ERROR: tcp_main: handle_new_connect: failed to add"
04113 " new socket to the fd list\n");
04114 tcpconn->flags&=~F_CONN_READ_W;
04115 if (tcpconn_try_unhash(tcpconn))
04116 tcpconn_put_destroy(tcpconn);
04117 }
04118 #else
04119 atomic_set(&tcpconn->refcnt, 2);
04120
04121
04122 tcpconn->flags|=F_CONN_READER;
04123 tcpconn_add(tcpconn);
04124 DBG("handle_new_connect: new connection from %s: %p %d flags: %04x\n",
04125 su2a(&su, sizeof(su)), tcpconn, tcpconn->s, tcpconn->flags);
04126 if(unlikely(send2child(tcpconn)<0)){
04127 tcpconn->flags&=~F_CONN_READER;
04128 if (tcpconn_try_unhash(tcpconn))
04129 tcpconn_put(tcpconn);
04130 tcpconn_put_destroy(tcpconn);
04131 }
04132 #endif
04133 }else{
04134 LOG(L_ERR, "ERROR: handle_new_connect: tcpconn_new failed, "
04135 "closing socket\n");
04136 tcp_safe_close(new_sock);
04137 (*tcp_connections_no)--;
04138 if (unlikely(si->proto==PROTO_TLS))
04139 (*tls_connections_no)--;
04140 }
04141 return 1;
04142 }
04143
04144
04145
04146
04147
04148
04149
04150
04151
04152
04153
04154
04155
04156 inline static int handle_tcpconn_ev(struct tcp_connection* tcpconn, short ev,
04157 int fd_i)
04158 {
04159 #ifdef TCP_ASYNC
04160 int empty_q;
04161 int bytes;
04162 #endif
04163
04164
04165
04166
04167
04168
04169 #if 0
04170 if ((tcpconn->refcnt!=0)){
04171
04172
04173
04174 LOG(L_CRIT, "BUG: handle_tcpconn_ev: io event on referenced"
04175 " tcpconn (%p), refcnt=%d, fd=%d\n",
04176 tcpconn, tcpconn->refcnt, tcpconn->s);
04177 return -1;
04178 }
04179 #endif
04180
04181
04182 #ifdef TCP_ASYNC
04183 empty_q=0;
04184 if (unlikely((ev & (POLLOUT|POLLERR|POLLHUP)) &&
04185 (tcpconn->flags & F_CONN_WRITE_W))){
04186 if (unlikely((ev & (POLLERR|POLLHUP)) ||
04187 (wbufq_run(tcpconn->s, tcpconn, &empty_q)<0) ||
04188 (empty_q && tcpconn_close_after_send(tcpconn))
04189 )){
04190 if ((tcpconn->flags & F_CONN_READ_W) && (ev & POLLIN)){
04191
04192
04193
04194
04195
04196
04197
04198
04199
04200
04201 if ((ioctl(tcpconn->s, FIONREAD, &bytes)>=0) && (bytes>0)){
04202 if (unlikely(io_watch_del(&io_h, tcpconn->s, fd_i, 0)<0)){
04203 LOG(L_ERR, "ERROR: handle_tcpconn_ev: io_watch_del(1)"
04204 " failed: for %p, fd %d\n",
04205 tcpconn, tcpconn->s);
04206 }
04207 tcpconn->flags&=~(F_CONN_WRITE_W|F_CONN_READ_W|
04208 F_CONN_WANTS_RD|F_CONN_WANTS_WR);
04209 tcpconn->flags|=F_CONN_FORCE_EOF|F_CONN_WR_ERROR;
04210 goto send_to_child;
04211 }
04212
04213 }
04214 if (unlikely(io_watch_del(&io_h, tcpconn->s, fd_i,
04215 IO_FD_CLOSING) < 0)){
04216 LOG(L_ERR, "ERROR: handle_tcpconn_ev: io_watch_del() failed:"
04217 " for %p, fd %d\n", tcpconn, tcpconn->s);
04218 }
04219 tcpconn->flags&=~(F_CONN_WRITE_W|F_CONN_READ_W|
04220 F_CONN_WANTS_RD|F_CONN_WANTS_WR);
04221 if (unlikely(ev & POLLERR)){
04222 if (unlikely(tcpconn->state==S_CONN_CONNECT)){
04223 #ifdef USE_DST_BLACKLIST
04224 dst_blacklist_su(BLST_ERR_CONNECT, tcpconn->rcv.proto,
04225 &tcpconn->rcv.src_su,
04226 &tcpconn->send_flags, 0);
04227 #endif
04228 TCP_EV_CONNECT_ERR(0, TCP_LADDR(tcpconn),
04229 TCP_LPORT(tcpconn), TCP_PSU(tcpconn),
04230 TCP_PROTO(tcpconn));
04231 TCP_STATS_CONNECT_FAILED();
04232 }else{
04233 #ifdef USE_DST_BLACKLIST
04234 dst_blacklist_su(BLST_ERR_SEND, tcpconn->rcv.proto,
04235 &tcpconn->rcv.src_su,
04236 &tcpconn->send_flags, 0);
04237 #endif
04238 TCP_STATS_CON_RESET();
04239 }
04240 }
04241 if (unlikely(!tcpconn_try_unhash(tcpconn))){
04242 LOG(L_CRIT, "BUG: tcpconn_ev: unhashed connection %p\n",
04243 tcpconn);
04244 }
04245 tcpconn_put_destroy(tcpconn);
04246 goto error;
04247 }
04248 if (empty_q){
04249 tcpconn->flags&=~F_CONN_WANTS_WR;
04250 if (!(tcpconn->flags & F_CONN_READ_W)){
04251 if (unlikely(io_watch_del(&io_h, tcpconn->s, fd_i, 0)==-1)){
04252 LOG(L_ERR, "ERROR: handle_tcpconn_ev: io_watch_del(2)"
04253 " failed:" " for %p, fd %d\n",
04254 tcpconn, tcpconn->s);
04255 goto error;
04256 }
04257 }else{
04258 if (unlikely(io_watch_chg(&io_h, tcpconn->s,
04259 POLLIN, fd_i)==-1)){
04260 LOG(L_ERR, "ERROR: handle_tcpconn_ev: io_watch_chg(1)"
04261 " failed:" " for %p, fd %d\n",
04262 tcpconn, tcpconn->s);
04263 goto error;
04264 }
04265 }
04266 tcpconn->flags&=~F_CONN_WRITE_W;
04267 }
04268 ev&=~POLLOUT;
04269 }
04270 if (likely(ev && (tcpconn->flags & F_CONN_READ_W))){
04271
04272
04273
04274
04275 if (unlikely(tcpconn->flags & F_CONN_WRITE_W)){
04276 if (unlikely(io_watch_chg(&io_h, tcpconn->s, POLLOUT, fd_i)==-1)){
04277 LOG(L_ERR, "ERROR: handle_tcpconn_ev: io_watch_chg(2)"
04278 " failed:" " for %p, fd %d\n",
04279 tcpconn, tcpconn->s);
04280 goto error;
04281 }
04282 }else
04283 #else
04284 {
04285 #endif
04286 if (unlikely(io_watch_del(&io_h, tcpconn->s, fd_i, 0)==-1)){
04287 LOG(L_ERR, "ERROR: handle_tcpconn_ev: io_watch_del(3)"
04288 " failed:" " for %p, fd %d\n",
04289 tcpconn, tcpconn->s);
04290 goto error;
04291 }
04292 #ifdef TCP_ASYNC
04293 send_to_child:
04294 #endif
04295 DBG("tcp: DBG: sending to child, events %x\n", ev);
04296 #ifdef POLLRDHUP
04297 tcpconn->flags|=((int)!(ev & (POLLRDHUP|POLLHUP|POLLERR)) -1) &
04298 F_CONN_EOF_SEEN;
04299 #else
04300 tcpconn->flags|=((int)!(ev & (POLLHUP|POLLERR)) -1) & F_CONN_EOF_SEEN;
04301 #endif
04302 tcpconn->flags|= ((int)!(ev & POLLPRI) -1) & F_CONN_OOB_DATA;
04303 tcpconn->flags|=F_CONN_READER;
04304 local_timer_del(&tcp_main_ltimer, &tcpconn->timer);
04305 tcpconn->flags&=~(F_CONN_MAIN_TIMER|F_CONN_READ_W|F_CONN_WANTS_RD);
04306 tcpconn_ref(tcpconn);
04307 if (unlikely(send2child(tcpconn)<0)){
04308 tcpconn->flags&=~F_CONN_READER;
04309 #ifdef TCP_ASYNC
04310 if (tcpconn->flags & F_CONN_WRITE_W){
04311 if (unlikely(io_watch_del(&io_h, tcpconn->s, fd_i,
04312 IO_FD_CLOSING) < 0)){
04313 LOG(L_ERR, "ERROR: handle_tcpconn_ev: io_watch_del(4)"
04314 " failed:" " for %p, fd %d\n",
04315 tcpconn, tcpconn->s);
04316 }
04317 tcpconn->flags&=~F_CONN_WRITE_W;
04318 }
04319 #endif
04320 if (tcpconn_try_unhash(tcpconn))
04321 tcpconn_put(tcpconn);
04322 tcpconn_put_destroy(tcpconn);
04323 }
04324 }
04325 return 0;
04326
04327
04328 error:
04329 return -1;
04330 }
04331
04332
04333
04334
04335
04336
04337
04338
04339
04340
04341
04342
04343
04344
04345
04346
04347 inline static int handle_io(struct fd_map* fm, short ev, int idx)
04348 {
04349 int ret;
04350
04351
04352 cfg_update();
04353
04354 switch(fm->type){
04355 case F_SOCKINFO:
04356 ret=handle_new_connect((struct socket_info*)fm->data);
04357 break;
04358 case F_TCPCONN:
04359 ret=handle_tcpconn_ev((struct tcp_connection*)fm->data, ev, idx);
04360 break;
04361 case F_TCPCHILD:
04362 ret=handle_tcp_child((struct tcp_child*)fm->data, idx);
04363 break;
04364 case F_PROC:
04365 ret=handle_ser_child((struct process_table*)fm->data, idx);
04366 break;
04367 case F_NONE:
04368 LOG(L_CRIT, "BUG: handle_io: empty fd map: %p {%d, %d, %p},"
04369 " idx %d\n", fm, fm->fd, fm->type, fm->data, idx);
04370 goto error;
04371 default:
04372 LOG(L_CRIT, "BUG: handle_io: unknown fd type %d\n", fm->type);
04373 goto error;
04374 }
04375 return ret;
04376 error:
04377 return -1;
04378 }
04379
04380
04381
04382
04383 static ticks_t tcpconn_main_timeout(ticks_t t, struct timer_ln* tl, void* data)
04384 {
04385 struct tcp_connection *c;
04386 int fd;
04387 int tcp_async;
04388
04389 c=(struct tcp_connection*)data;
04390
04391
04392 #ifdef TCP_ASYNC
04393 DBG( "tcp_main: entering timer for %p (ticks=%d, timeout=%d (%d s), "
04394 "wr_timeout=%d (%d s)), write queue: %d bytes\n",
04395 c, t, c->timeout, TICKS_TO_S(c->timeout-t),
04396 c->wbuf_q.wr_timeout, TICKS_TO_S(c->wbuf_q.wr_timeout-t),
04397 c->wbuf_q.queued);
04398
04399 tcp_async=cfg_get(tcp, tcp_cfg, async);
04400 if (likely(TICKS_LT(t, c->timeout) && ( !tcp_async | _wbufq_empty(c) |
04401 TICKS_LT(t, c->wbuf_q.wr_timeout)) )){
04402 if (unlikely(tcp_async && _wbufq_non_empty(c)))
04403 return (ticks_t)MIN_unsigned(c->timeout-t, c->wbuf_q.wr_timeout-t);
04404 else
04405 return (ticks_t)(c->timeout - t);
04406 }
04407
04408 if (tcp_async && _wbufq_non_empty(c) && TICKS_GE(t, c->wbuf_q.wr_timeout)){
04409 if (unlikely(c->state==S_CONN_CONNECT)){
04410 #ifdef USE_DST_BLACKLIST
04411 dst_blacklist_su(BLST_ERR_CONNECT, c->rcv.proto, &c->rcv.src_su,
04412 &c->send_flags, 0);
04413 #endif
04414 TCP_EV_CONNECT_TIMEOUT(0, TCP_LADDR(c), TCP_LPORT(c), TCP_PSU(c),
04415 TCP_PROTO(c));
04416 TCP_STATS_CONNECT_FAILED();
04417 }else{
04418 #ifdef USE_DST_BLACKLIST
04419 dst_blacklist_su(BLST_ERR_SEND, c->rcv.proto, &c->rcv.src_su,
04420 &c->send_flags, 0);
04421 #endif
04422 TCP_EV_SEND_TIMEOUT(0, &c->rcv);
04423 TCP_STATS_SEND_TIMEOUT();
04424 }
04425 }else{
04426
04427 TCP_EV_IDLE_CONN_CLOSED(0, &c->rcv);
04428 TCP_STATS_CON_TIMEOUT();
04429 }
04430 #else
04431 if (TICKS_LT(t, c->timeout)){
04432
04433 return (ticks_t)(c->timeout - t);
04434 }
04435
04436 TCP_EV_IDLE_CONN_CLOSED(0, &c->rcv);
04437 TCP_STATS_CON_TIMEOUT();
04438 #endif
04439 DBG("tcp_main: timeout for %p\n", c);
04440 if (likely(c->flags & F_CONN_HASHED)){
04441 c->flags&=~(F_CONN_HASHED|F_CONN_MAIN_TIMER);
04442 c->state=S_CONN_BAD;
04443 TCPCONN_LOCK;
04444 _tcpconn_detach(c);
04445 TCPCONN_UNLOCK;
04446 }else{
04447 c->flags&=~F_CONN_MAIN_TIMER;
04448 LOG(L_CRIT, "BUG: tcp_main: timer: called with unhashed connection %p"
04449 "\n", c);
04450 tcpconn_ref(c);
04451 }
04452 fd=c->s;
04453 if (likely(fd>0)){
04454 if (likely(c->flags & (F_CONN_READ_W|F_CONN_WRITE_W))){
04455 io_watch_del(&io_h, fd, -1, IO_FD_CLOSING);
04456 c->flags&=~(F_CONN_READ_W|F_CONN_WRITE_W);
04457 }
04458 }
04459 tcpconn_put_destroy(c);
04460 return 0;
04461 }
04462
04463
04464
04465 static inline void tcp_timer_run(void)
04466 {
04467 ticks_t ticks;
04468
04469 ticks=get_ticks_raw();
04470 if (unlikely((ticks-tcp_main_prev_ticks)<TCPCONN_TIMEOUT_MIN_RUN)) return;
04471 tcp_main_prev_ticks=ticks;
04472 local_timer_run(&tcp_main_ltimer, ticks);
04473 }
04474
04475
04476
04477
04478
04479
04480
04481
04482
04483 static inline void tcpconn_destroy_all(void)
04484 {
04485 struct tcp_connection *c, *next;
04486 unsigned h;
04487 int fd;
04488
04489
04490 TCPCONN_LOCK;
04491 for(h=0; h<TCP_ID_HASH_SIZE; h++){
04492 c=tcpconn_id_hash[h];
04493 while(c){
04494 next=c->id_next;
04495 if (is_tcp_main){
04496
04497
04498 if ((c->flags & F_CONN_MAIN_TIMER)){
04499 local_timer_del(&tcp_main_ltimer, &c->timer);
04500 c->flags&=~F_CONN_MAIN_TIMER;
04501 }
04502 fd=c->s;
04503 if (fd>0 && (c->flags & (F_CONN_READ_W|F_CONN_WRITE_W))){
04504 io_watch_del(&io_h, fd, -1, IO_FD_CLOSING);
04505 c->flags&=~(F_CONN_READ_W|F_CONN_WRITE_W);
04506 }
04507 }else{
04508 fd=-1;
04509 }
04510 #ifdef USE_TLS
04511 if (fd>0 && c->type==PROTO_TLS)
04512 tls_close(c, fd);
04513 #endif
04514 _tcpconn_rm(c);
04515 c->flags &= ~F_CONN_HASHED;
04516 if (fd>0) {
04517 #ifdef TCP_FD_CACHE
04518 if (likely(cfg_get(tcp, tcp_cfg, fd_cache)))
04519 shutdown(fd, SHUT_RDWR);
04520 #endif
04521 tcp_safe_close(fd);
04522 }
04523 (*tcp_connections_no)--;
04524 if (unlikely(c->type==PROTO_TLS))
04525 (*tls_connections_no)--;
04526 c=next;
04527 }
04528 }
04529 TCPCONN_UNLOCK;
04530 }
04531
04532
04533
04534
04535 void tcp_main_loop()
04536 {
04537
04538 struct socket_info* si;
04539 int r;
04540
04541 is_tcp_main=1;
04542
04543 tcp_main_max_fd_no=get_max_open_fds();
04544
04545
04546 #ifdef SEND_FD_QUEUE
04547 if (init_send_fd_queues()<0){
04548 LOG(L_CRIT, "ERROR: init_tcp: could not init send fd queues\n");
04549 goto error;
04550 }
04551 #endif
04552
04553
04554 if (init_io_wait(&io_h, tcp_main_max_fd_no, tcp_poll_method)<0)
04555 goto error;
04556
04557
04558
04559 tcp_main_prev_ticks=get_ticks_raw();
04560 if (init_local_timer(&tcp_main_ltimer, get_ticks_raw())!=0){
04561 LOG(L_ERR, "ERROR: init_tcp: failed to init local timer\n");
04562 goto error;
04563 }
04564 #ifdef TCP_FD_CACHE
04565 if (cfg_get(tcp, tcp_cfg, fd_cache)) tcp_fd_cache_init();
04566 #endif
04567
04568
04569 for (si=tcp_listen; si; si=si->next){
04570 if ((si->proto==PROTO_TCP) &&(si->socket!=-1)){
04571 if (io_watch_add(&io_h, si->socket, POLLIN, F_SOCKINFO, si)<0){
04572 LOG(L_CRIT, "ERROR: tcp_main_loop: init: failed to add "
04573 "listen socket to the fd list\n");
04574 goto error;
04575 }
04576 }else{
04577 LOG(L_CRIT, "BUG: tcp_main_loop: non tcp address in tcp_listen\n");
04578 }
04579 }
04580 #ifdef USE_TLS
04581 if (!tls_disable && tls_loaded()){
04582 for (si=tls_listen; si; si=si->next){
04583 if ((si->proto==PROTO_TLS) && (si->socket!=-1)){
04584 if (io_watch_add(&io_h, si->socket, POLLIN, F_SOCKINFO, si)<0){
04585 LOG(L_CRIT, "ERROR: tcp_main_loop: init: failed to add "
04586 "tls listen socket to the fd list\n");
04587 goto error;
04588 }
04589 }else{
04590 LOG(L_CRIT, "BUG: tcp_main_loop: non tls address"
04591 " in tls_listen\n");
04592 }
04593 }
04594 }
04595 #endif
04596
04597
04598 for (r=1; r<process_no; r++){
04599 if (pt[r].unix_sock>0)
04600 if (io_watch_add(&io_h, pt[r].unix_sock, POLLIN,F_PROC, &pt[r])<0){
04601 LOG(L_CRIT, "ERROR: tcp_main_loop: init: failed to add "
04602 "process %d unix socket to the fd list\n", r);
04603 goto error;
04604 }
04605 }
04606
04607 for (r=0; r<tcp_children_no; r++){
04608 if (tcp_children[r].unix_sock>0)
04609 if (io_watch_add(&io_h, tcp_children[r].unix_sock, POLLIN,
04610 F_TCPCHILD, &tcp_children[r]) <0){
04611 LOG(L_CRIT, "ERROR: tcp_main_loop: init: failed to add "
04612 "tcp child %d unix socket to the fd list\n", r);
04613 goto error;
04614 }
04615 }
04616
04617
04618
04619 if (cfg_child_init()) goto error;
04620
04621
04622 switch(io_h.poll_method){
04623 case POLL_POLL:
04624 while(1){
04625
04626 io_wait_loop_poll(&io_h, TCP_MAIN_SELECT_TIMEOUT, 0);
04627 send_fd_queue_run(&send2child_q);
04628
04629 tcp_timer_run();
04630 }
04631 break;
04632 #ifdef HAVE_SELECT
04633 case POLL_SELECT:
04634 while(1){
04635 io_wait_loop_select(&io_h, TCP_MAIN_SELECT_TIMEOUT, 0);
04636 send_fd_queue_run(&send2child_q);
04637 tcp_timer_run();
04638 }
04639 break;
04640 #endif
04641 #ifdef HAVE_SIGIO_RT
04642 case POLL_SIGIO_RT:
04643 while(1){
04644 io_wait_loop_sigio_rt(&io_h, TCP_MAIN_SELECT_TIMEOUT);
04645 send_fd_queue_run(&send2child_q);
04646 tcp_timer_run();
04647 }
04648 break;
04649 #endif
04650 #ifdef HAVE_EPOLL
04651 case POLL_EPOLL_LT:
04652 while(1){
04653 io_wait_loop_epoll(&io_h, TCP_MAIN_SELECT_TIMEOUT, 0);
04654 send_fd_queue_run(&send2child_q);
04655 tcp_timer_run();
04656 }
04657 break;
04658 case POLL_EPOLL_ET:
04659 while(1){
04660 io_wait_loop_epoll(&io_h, TCP_MAIN_SELECT_TIMEOUT, 1);
04661 send_fd_queue_run(&send2child_q);
04662 tcp_timer_run();
04663 }
04664 break;
04665 #endif
04666 #ifdef HAVE_KQUEUE
04667 case POLL_KQUEUE:
04668 while(1){
04669 io_wait_loop_kqueue(&io_h, TCP_MAIN_SELECT_TIMEOUT, 0);
04670 send_fd_queue_run(&send2child_q);
04671 tcp_timer_run();
04672 }
04673 break;
04674 #endif
04675 #ifdef HAVE_DEVPOLL
04676 case POLL_DEVPOLL:
04677 while(1){
04678 io_wait_loop_devpoll(&io_h, TCP_MAIN_SELECT_TIMEOUT, 0);
04679 send_fd_queue_run(&send2child_q);
04680 tcp_timer_run();
04681 }
04682 break;
04683 #endif
04684 default:
04685 LOG(L_CRIT, "BUG: tcp_main_loop: no support for poll method "
04686 " %s (%d)\n",
04687 poll_method_name(io_h.poll_method), io_h.poll_method);
04688 goto error;
04689 }
04690 error:
04691 #ifdef SEND_FD_QUEUE
04692 destroy_send_fd_queues();
04693 #endif
04694 destroy_io_wait(&io_h);
04695 LOG(L_CRIT, "ERROR: tcp_main_loop: exiting...");
04696 exit(-1);
04697 }
04698
04699
04700
04701
04702 void destroy_tcp()
04703 {
04704 if (tcpconn_id_hash){
04705 if (tcpconn_lock)
04706 TCPCONN_UNLOCK;
04707
04708
04709
04710 tcpconn_destroy_all();
04711 shm_free(tcpconn_id_hash);
04712 tcpconn_id_hash=0;
04713 }
04714 DESTROY_TCP_STATS();
04715 if (tcp_connections_no){
04716 shm_free(tcp_connections_no);
04717 tcp_connections_no=0;
04718 }
04719 if (tls_connections_no){
04720 shm_free(tls_connections_no);
04721 tls_connections_no=0;
04722 }
04723 #ifdef TCP_ASYNC
04724 if (tcp_total_wq){
04725 shm_free(tcp_total_wq);
04726 tcp_total_wq=0;
04727 }
04728 #endif
04729 if (connection_id){
04730 shm_free(connection_id);
04731 connection_id=0;
04732 }
04733 if (tcpconn_aliases_hash){
04734 shm_free(tcpconn_aliases_hash);
04735 tcpconn_aliases_hash=0;
04736 }
04737 if (tcpconn_lock){
04738 lock_destroy(tcpconn_lock);
04739 lock_dealloc((void*)tcpconn_lock);
04740 tcpconn_lock=0;
04741 }
04742 if (tcp_children){
04743 pkg_free(tcp_children);
04744 tcp_children=0;
04745 }
04746 destroy_local_timer(&tcp_main_ltimer);
04747 }
04748
04749
04750
04751 int init_tcp()
04752 {
04753 char* poll_err;
04754
04755 tcp_options_check();
04756 if (tcp_cfg==0){
04757 BUG("tcp_cfg not initialized\n");
04758 goto error;
04759 }
04760
04761 tcpconn_lock=lock_alloc();
04762 if (tcpconn_lock==0){
04763 LOG(L_CRIT, "ERROR: init_tcp: could not alloc lock\n");
04764 goto error;
04765 }
04766 if (lock_init(tcpconn_lock)==0){
04767 LOG(L_CRIT, "ERROR: init_tcp: could not init lock\n");
04768 lock_dealloc((void*)tcpconn_lock);
04769 tcpconn_lock=0;
04770 goto error;
04771 }
04772
04773 tcp_connections_no=shm_malloc(sizeof(int));
04774 if (tcp_connections_no==0){
04775 LOG(L_CRIT, "ERROR: init_tcp: could not alloc globals\n");
04776 goto error;
04777 }
04778 *tcp_connections_no=0;
04779 tls_connections_no=shm_malloc(sizeof(int));
04780 if (tls_connections_no==0){
04781 LOG(L_CRIT, "ERROR: init_tcp: could not alloc globals\n");
04782 goto error;
04783 }
04784 *tls_connections_no=0;
04785 if (INIT_TCP_STATS()!=0) goto error;
04786 connection_id=shm_malloc(sizeof(int));
04787 if (connection_id==0){
04788 LOG(L_CRIT, "ERROR: init_tcp: could not alloc globals\n");
04789 goto error;
04790 }
04791 *connection_id=1;
04792 #ifdef TCP_ASYNC
04793 tcp_total_wq=shm_malloc(sizeof(*tcp_total_wq));
04794 if (tcp_total_wq==0){
04795 LOG(L_CRIT, "ERROR: init_tcp: could not alloc globals\n");
04796 goto error;
04797 }
04798 #endif
04799
04800 tcpconn_aliases_hash=(struct tcp_conn_alias**)
04801 shm_malloc(TCP_ALIAS_HASH_SIZE* sizeof(struct tcp_conn_alias*));
04802 if (tcpconn_aliases_hash==0){
04803 LOG(L_CRIT, "ERROR: init_tcp: could not alloc address hashtable\n");
04804 goto error;
04805 }
04806 tcpconn_id_hash=(struct tcp_connection**)shm_malloc(TCP_ID_HASH_SIZE*
04807 sizeof(struct tcp_connection*));
04808 if (tcpconn_id_hash==0){
04809 LOG(L_CRIT, "ERROR: init_tcp: could not alloc id hashtable\n");
04810 goto error;
04811 }
04812
04813 memset((void*)tcpconn_aliases_hash, 0,
04814 TCP_ALIAS_HASH_SIZE * sizeof(struct tcp_conn_alias*));
04815 memset((void*)tcpconn_id_hash, 0,
04816 TCP_ID_HASH_SIZE * sizeof(struct tcp_connection*));
04817
04818
04819 poll_err=check_poll_method(tcp_poll_method);
04820
04821
04822 if (poll_err || (tcp_poll_method==0)){
04823 tcp_poll_method=choose_poll_method();
04824 if (poll_err){
04825 LOG(L_ERR, "ERROR: init_tcp: %s, using %s instead\n",
04826 poll_err, poll_method_name(tcp_poll_method));
04827 }else{
04828 LOG(L_INFO, "init_tcp: using %s as the io watch method"
04829 " (auto detected)\n", poll_method_name(tcp_poll_method));
04830 }
04831 }else{
04832 LOG(L_INFO, "init_tcp: using %s io watch method (config)\n",
04833 poll_method_name(tcp_poll_method));
04834 }
04835
04836 return 0;
04837 error:
04838
04839 destroy_tcp();
04840 return -1;
04841 }
04842
04843
04844 #ifdef TCP_CHILD_NON_BLOCKING
04845
04846 static int set_non_blocking(int s)
04847 {
04848 int flags;
04849
04850 flags=fcntl(s, F_GETFL);
04851 if (flags==-1){
04852 LOG(L_ERR, "ERROR: set_non_blocking: fnctl failed: (%d) %s\n",
04853 errno, strerror(errno));
04854 goto error;
04855 }
04856 if (fcntl(s, F_SETFL, flags|O_NONBLOCK)==-1){
04857 LOG(L_ERR, "ERROR: set_non_blocking: fcntl: set non-blocking failed:"
04858 " (%d) %s\n", errno, strerror(errno));
04859 goto error;
04860 }
04861 return 0;
04862 error:
04863 return -1;
04864 }
04865
04866 #endif
04867
04868
04869
04870 int tcp_fix_child_sockets(int* fd)
04871 {
04872 #ifdef TCP_CHILD_NON_BLOCKING
04873 if ((set_non_blocking(fd[0])<0) ||
04874 (set_non_blocking(fd[1])<0)){
04875 return -1;
04876 }
04877 #endif
04878 return 0;
04879 }
04880
04881
04882
04883
04884 int tcp_init_children()
04885 {
04886 int r, i;
04887 int reader_fd_1;
04888 pid_t pid;
04889 char si_desc[MAX_PT_DESC];
04890 struct socket_info *si;
04891
04892
04893
04894
04895
04896 for(r=0, si=tcp_listen; si; si=si->next, r++);
04897 #ifdef USE_TLS
04898 if (! tls_disable)
04899 for (si=tls_listen; si; si=si->next, r++);
04900 #endif
04901
04902 register_fds(r+tcp_max_connections+get_max_procs()-1 );
04903 #if 0
04904 tcp_max_fd_no=get_max_procs()*2 +r-1 +3;
04905
04906
04907
04908 tcp_max_fd_no+=tcp_max_connections+get_max_procs()-1 ;
04909 #endif
04910
04911 tcp_children=pkg_malloc(sizeof(struct tcp_child)*tcp_children_no);
04912 if (tcp_children==0){
04913 LOG(L_ERR, "ERROR: tcp_init_children: out of memory\n");
04914 goto error;
04915 }
04916 memset(tcp_children, 0, sizeof(struct tcp_child)*tcp_children_no);
04917
04918
04919
04920 i = tcp_children_no-1;
04921 for(si=tcp_listen; si; si=si->next) {
04922 if(si->workers>0) {
04923 si->workers_tcpidx = i - si->workers + 1;
04924 for(r=0; r<si->workers; r++) {
04925 tcp_children[i].mysocket = si;
04926 i--;
04927 }
04928 }
04929 }
04930 tcp_sockets_gworkers = (i != tcp_children_no-1)?(1 + i + 1):0;
04931
04932
04933
04934
04935
04936 for(r=0; r<tcp_children_no; r++){
04937 child_rank++;
04938 snprintf(si_desc, MAX_PT_DESC, "tcp receiver (%s)",
04939 (tcp_children[r].mysocket!=NULL)?
04940 tcp_children[r].mysocket->sock_str.s:"generic");
04941 pid=fork_tcp_process(child_rank, si_desc, r, &reader_fd_1);
04942 if (pid<0){
04943 LOG(L_ERR, "ERROR: tcp_main: fork failed: %s\n",
04944 strerror(errno));
04945 goto error;
04946 }else if (pid>0){
04947
04948 }else{
04949
04950 bind_address=0;
04951
04952 tcp_receive_loop(reader_fd_1);
04953 }
04954 }
04955 return 0;
04956 error:
04957 return -1;
04958 }
04959
04960
04961
04962 void tcp_get_info(struct tcp_gen_info *ti)
04963 {
04964 ti->tcp_readers=tcp_children_no;
04965 ti->tcp_max_connections=tcp_max_connections;
04966 ti->tls_max_connections=tls_max_connections;
04967 ti->tcp_connections_no=*tcp_connections_no;
04968 ti->tls_connections_no=*tls_connections_no;
04969 #ifdef TCP_ASYNC
04970 ti->tcp_write_queued=*tcp_total_wq;
04971 #else
04972 ti->tcp_write_queued=0;
04973 #endif
04974 }
04975
04976 #endif