4fb0290889dd3a73fe8adf8d608383dee2643435
[OpenSRF.git] / src / libopensrf / transport_session.c
1 #include <opensrf/transport_session.h>
2
3
4
5 // ---------------------------------------------------------------------------------
6 // returns a built and allocated transport_session object.
7 // This codes does no network activity, only memory initilization
8 // ---------------------------------------------------------------------------------
9 transport_session* init_transport(  const char* server, 
10         int port, const char* unix_path, void* user_data, int component ) {
11
12         /* create the session struct */
13         transport_session* session = 
14                 (transport_session*) safe_malloc( sizeof(transport_session) );
15
16         session->user_data = user_data;
17
18         session->component = component;
19
20         /* initialize the data buffers */
21         session->body_buffer                    = buffer_init( JABBER_BODY_BUFSIZE );
22         session->subject_buffer         = buffer_init( JABBER_SUBJECT_BUFSIZE );
23         session->thread_buffer          = buffer_init( JABBER_THREAD_BUFSIZE );
24         session->from_buffer                    = buffer_init( JABBER_JID_BUFSIZE );
25         session->status_buffer          = buffer_init( JABBER_STATUS_BUFSIZE );
26         session->recipient_buffer       = buffer_init( JABBER_JID_BUFSIZE );
27         session->message_error_type = buffer_init( JABBER_JID_BUFSIZE );
28         session->session_id                     = buffer_init( 64 ); 
29
30         /* for OpenSRF extensions */
31         session->router_to_buffer               = buffer_init( JABBER_JID_BUFSIZE );
32         session->router_from_buffer     = buffer_init( JABBER_JID_BUFSIZE );
33         session->osrf_xid_buffer        = buffer_init( JABBER_JID_BUFSIZE );
34         session->router_class_buffer    = buffer_init( JABBER_JID_BUFSIZE );
35         session->router_command_buffer  = buffer_init( JABBER_JID_BUFSIZE );
36
37
38         if(     session->body_buffer            == NULL || session->subject_buffer       == NULL        ||
39                         session->thread_buffer  == NULL || session->from_buffer          == NULL        ||
40                         session->status_buffer  == NULL || session->recipient_buffer == NULL ||
41                         session->router_to_buffer       == NULL || session->router_from_buffer   == NULL ||
42                         session->router_class_buffer == NULL || session->router_command_buffer == NULL ||
43                         session->session_id == NULL ) { 
44
45                 osrfLogError(OSRF_LOG_MARK,  "init_transport(): buffer_init returned NULL" );
46                 return 0;
47         }
48
49
50         /* initialize the jabber state machine */
51         session->state_machine = (jabber_machine*) safe_malloc( sizeof(jabber_machine) );
52
53         /* initialize the sax push parser */
54         session->parser_ctxt = xmlCreatePushParserCtxt(SAXHandler, session, "", 0, NULL);
55
56         /* initialize the transport_socket structure */
57         session->sock_mgr = (socket_manager*) safe_malloc( sizeof(socket_manager) );
58
59         session->sock_mgr->data_received = &grab_incoming;
60         session->sock_mgr->blob = session;
61         
62         session->port = port;
63         session->server = strdup(server);
64         if(unix_path)   
65                 session->unix_path = strdup(unix_path);
66         else session->unix_path = NULL;
67
68         session->sock_id = 0;
69
70         return session;
71 }
72
73
74
75 /* XXX FREE THE BUFFERS */
76 int session_free( transport_session* session ) {
77         if( ! session ) { return 0; }
78
79         if(session->sock_mgr)
80                 socket_manager_free(session->sock_mgr);
81
82         if( session->state_machine ) free( session->state_machine );
83         if( session->parser_ctxt) {
84                 xmlFreeDoc( session->parser_ctxt->myDoc );
85                 xmlFreeParserCtxt(session->parser_ctxt);
86         }
87
88         xmlCleanupCharEncodingHandlers();
89         xmlDictCleanup();
90         xmlCleanupParser();
91
92         buffer_free(session->body_buffer);
93         buffer_free(session->subject_buffer);
94         buffer_free(session->thread_buffer);
95         buffer_free(session->from_buffer);
96         buffer_free(session->recipient_buffer);
97         buffer_free(session->status_buffer);
98         buffer_free(session->message_error_type);
99         buffer_free(session->router_to_buffer);
100         buffer_free(session->router_from_buffer);
101         buffer_free(session->osrf_xid_buffer);
102         buffer_free(session->router_class_buffer);
103         buffer_free(session->router_command_buffer);
104         buffer_free(session->session_id);
105
106         free(session->server);
107         free(session->unix_path);
108
109         free( session );
110         return 1;
111 }
112
113
114 int session_wait( transport_session* session, int timeout ) {
115         if( ! session || ! session->sock_mgr ) {
116                 return 0;
117         }
118
119         int ret =  socket_wait( session->sock_mgr, timeout, session->sock_id );
120
121         if( ret ) {
122                 osrfLogDebug(OSRF_LOG_MARK, "socket_wait returned error code %d", ret);
123                 session->state_machine->connected = 0;
124         }
125         return ret;
126 }
127
128 int session_send_msg( 
129                 transport_session* session, transport_message* msg ) {
130
131         if( ! session ) { return -1; }
132
133         if( ! session->state_machine->connected ) {
134                 osrfLogWarning(OSRF_LOG_MARK, "State machine is not connected in send_msg()");
135                 return -1;
136         }
137
138         message_prepare_xml( msg );
139         //tcp_send( session->sock_obj, msg->msg_xml );
140         return socket_send( session->sock_id, msg->msg_xml );
141
142 }
143
144
145 /* connects to server and connects to jabber */
146 int session_connect( transport_session* session, 
147                 const char* username, const char* password, 
148                 const char* resource, int connect_timeout, enum TRANSPORT_AUTH_TYPE auth_type ) {
149
150         int size1 = 0;
151         int size2 = 0;
152
153         if( ! session ) { 
154                 osrfLogWarning(OSRF_LOG_MARK,  "session is null in connect" );
155                 return 0; 
156         }
157
158
159         //char* server = session->sock_obj->server;
160         char* server = session->server;
161
162         if( ! session->sock_id ) {
163
164                 if(session->port > 0) {
165                         if( (session->sock_id = socket_open_tcp_client(
166                                 session->sock_mgr, session->port, session->server)) <= 0 ) 
167                         return 0;
168
169                 } else if(session->unix_path != NULL) {
170                         if( (session->sock_id = socket_open_unix_client(
171                                 session->sock_mgr, session->unix_path)) <= 0 ) 
172                         return 0;
173                 }
174                 else {
175                         osrfLogWarning( OSRF_LOG_MARK, "Can't open session: no port or unix path" );
176                         return 0;
177                 }
178         }
179
180         if( session->component ) {
181
182                 /* the first Jabber connect stanza */
183                 char* our_hostname = getenv("HOSTNAME");
184                 size1 = 150 + strlen( server );
185                 char stanza1[ size1 ]; 
186                 memset( stanza1, 0, size1 );
187                 sprintf( stanza1, 
188                                 "<stream:stream version='1.0' xmlns:stream='http://etherx.jabber.org/streams' "
189                                 "xmlns='jabber:component:accept' to='%s' from='%s' xml:lang='en'>",
190                                 username, our_hostname );
191
192                 /* send the first stanze */
193                 session->state_machine->connecting = CONNECTING_1;
194
195 //              if( ! tcp_send( session->sock_obj, stanza1 ) ) {
196                 if( socket_send( session->sock_id, stanza1 ) ) {
197                         osrfLogWarning(OSRF_LOG_MARK, "error sending");
198                         return 0;
199                 }
200         
201                 /* wait for reply */
202                 //tcp_wait( session->sock_obj, connect_timeout ); /* make the timeout smarter XXX */
203                 socket_wait(session->sock_mgr, connect_timeout, session->sock_id);
204         
205                 /* server acknowledges our existence, now see if we can login */
206                 if( session->state_machine->connecting == CONNECTING_2 ) {
207         
208                         int ss = session->session_id->n_used + strlen(password) + 5;
209                         char hashstuff[ss];
210                         memset(hashstuff,0,ss);
211                         sprintf( hashstuff, "%s%s", session->session_id->buf, password );
212
213                         char* hash = shahash( hashstuff );
214                         size2 = 100 + strlen( hash );
215                         char stanza2[ size2 ];
216                         memset( stanza2, 0, size2 );
217                         sprintf( stanza2, "<handshake>%s</handshake>", hash );
218         
219                         //if( ! tcp_send( session->sock_obj, stanza2 )  ) {
220                         if( socket_send( session->sock_id, stanza2 )  ) {
221                                 osrfLogWarning(OSRF_LOG_MARK, "error sending");
222                                 return 0;
223                         }
224                 }
225
226         } else { /* we're not a component */
227
228                 /* the first Jabber connect stanza */
229                 size1 = 100 + strlen( server );
230                 char stanza1[ size1 ]; 
231                 memset( stanza1, 0, size1 );
232                 sprintf( stanza1, 
233                                 "<stream:stream to='%s' xmlns='jabber:client' "
234                                 "xmlns:stream='http://etherx.jabber.org/streams'>",
235                         server );
236         
237
238                 /* send the first stanze */
239                 session->state_machine->connecting = CONNECTING_1;
240                 //if( ! tcp_send( session->sock_obj, stanza1 ) ) {
241                 if( socket_send( session->sock_id, stanza1 ) ) {
242                         osrfLogWarning(OSRF_LOG_MARK, "error sending");
243                         return 0;
244                 }
245
246
247                 /* wait for reply */
248                 //tcp_wait( session->sock_obj, connect_timeout ); /* make the timeout smarter XXX */
249                 socket_wait( session->sock_mgr, connect_timeout, session->sock_id ); /* make the timeout smarter XXX */
250
251                 if( auth_type == AUTH_PLAIN ) {
252
253                         /* the second jabber connect stanza including login info*/
254                         size2 = 150 + strlen( username ) + strlen(password) + strlen(resource);
255                         char stanza2[ size2 ];
256                         memset( stanza2, 0, size2 );
257                 
258                         sprintf( stanza2, 
259                                         "<iq id='123456789' type='set'><query xmlns='jabber:iq:auth'>"
260                                         "<username>%s</username><password>%s</password><resource>%s</resource></query></iq>",
261                                         username, password, resource );
262         
263                         /* server acknowledges our existence, now see if we can login */
264                         if( session->state_machine->connecting == CONNECTING_2 ) {
265                                 //if( ! tcp_send( session->sock_obj, stanza2 )  ) {
266                                 if( socket_send( session->sock_id, stanza2 )  ) {
267                                         osrfLogWarning(OSRF_LOG_MARK, "error sending");
268                                         return 0;
269                                 }
270                         }
271
272                 } else if( auth_type == AUTH_DIGEST ) {
273
274                         int ss = session->session_id->n_used + strlen(password) + 5;
275                         char hashstuff[ss];
276                         memset(hashstuff,0,ss);
277                         sprintf( hashstuff, "%s%s", session->session_id->buf, password );
278
279                         char* hash = shahash( hashstuff );
280
281                         /* the second jabber connect stanza including login info*/
282                         size2 = 150 + strlen( hash ) + strlen(password) + strlen(resource);
283                         char stanza2[ size2 ];
284                         memset( stanza2, 0, size2 );
285                 
286                         sprintf( stanza2, 
287                                         "<iq id='123456789' type='set'><query xmlns='jabber:iq:auth'>"
288                                         "<username>%s</username><digest>%s</digest><resource>%s</resource></query></iq>",
289                                         username, hash, resource );
290         
291                         /* server acknowledges our existence, now see if we can login */
292                         if( session->state_machine->connecting == CONNECTING_2 ) {
293                                 //if( ! tcp_send( session->sock_obj, stanza2 )  ) {
294                                 if( socket_send( session->sock_id, stanza2 )  ) {
295                                         osrfLogWarning(OSRF_LOG_MARK, "error sending");
296                                         return 0;
297                                 }
298                         }
299
300                 }
301
302         } // not component
303
304
305         /* wait for reply */
306         //tcp_wait( session->sock_obj, connect_timeout );
307         socket_wait( session->sock_mgr, connect_timeout, session->sock_id );
308
309         if( session->state_machine->connected ) {
310                 /* yar! */
311                 return 1;
312         }
313
314         return 0;
315 }
316
317 // ---------------------------------------------------------------------------------
318 // TCP data callback. Shove the data into the push parser.
319 // ---------------------------------------------------------------------------------
320 //void grab_incoming( void * session, char* data ) {
321 void grab_incoming(void* blob, socket_manager* mgr, int sockid, char* data, int parent) {
322         transport_session* ses = (transport_session*) blob;
323         if( ! ses ) { return; }
324         xmlParseChunk(ses->parser_ctxt, data, strlen(data), 0);
325 }
326
327
328 void startElementHandler(
329         void *session, const xmlChar *name, const xmlChar **atts) {
330
331         transport_session* ses = (transport_session*) session;
332         if( ! ses ) { return; }
333
334         
335         if( strcmp( (char*) name, "message" ) == 0 ) {
336                 ses->state_machine->in_message = 1;
337                 buffer_add( ses->from_buffer, get_xml_attr( atts, "from" ) );
338                 buffer_add( ses->recipient_buffer, get_xml_attr( atts, "to" ) );
339                 buffer_add( ses->router_from_buffer, get_xml_attr( atts, "router_from" ) );
340                 buffer_add( ses->osrf_xid_buffer, get_xml_attr( atts, "osrf_xid" ) );
341                 buffer_add( ses->router_to_buffer, get_xml_attr( atts, "router_to" ) );
342                 buffer_add( ses->router_class_buffer, get_xml_attr( atts, "router_class" ) );
343                 buffer_add( ses->router_command_buffer, get_xml_attr( atts, "router_command" ) );
344                 char* broadcast = get_xml_attr( atts, "broadcast" );
345                 if( broadcast )
346                         ses->router_broadcast = atoi( broadcast );
347
348                 return;
349         }
350
351         if( ses->state_machine->in_message ) {
352
353                 if( strcmp( (char*) name, "body" ) == 0 ) {
354                         ses->state_machine->in_message_body = 1;
355                         return;
356                 }
357         
358                 if( strcmp( (char*) name, "subject" ) == 0 ) {
359                         ses->state_machine->in_subject = 1;
360                         return;
361                 }
362         
363                 if( strcmp( (char*) name, "thread" ) == 0 ) {
364                         ses->state_machine->in_thread = 1;
365                         return;
366                 }
367
368         }
369
370         if( strcmp( (char*) name, "presence" ) == 0 ) {
371                 ses->state_machine->in_presence = 1;
372                 buffer_add( ses->from_buffer, get_xml_attr( atts, "from" ) );
373                 buffer_add( ses->recipient_buffer, get_xml_attr( atts, "to" ) );
374                 return;
375         }
376
377         if( strcmp( (char*) name, "status" ) == 0 ) {
378                 ses->state_machine->in_status = 1;
379                 return;
380         }
381
382
383         if( strcmp( (char*) name, "stream:error" ) == 0 ) {
384                 ses->state_machine->in_error = 1;
385                 ses->state_machine->connected = 0;
386                 osrfLogWarning(  OSRF_LOG_MARK, "Received <stream:error> message from Jabber server" );
387                 return;
388         }
389
390
391         /* first server response from a connect attempt */
392         if( strcmp( (char*) name, "stream:stream" ) == 0 ) {
393                 if( ses->state_machine->connecting == CONNECTING_1 ) {
394                         ses->state_machine->connecting = CONNECTING_2;
395                         buffer_add( ses->session_id, get_xml_attr(atts, "id") );
396                 }
397         }
398
399         if( strcmp( (char*) name, "handshake" ) == 0 ) {
400                 ses->state_machine->connected = 1;
401                 ses->state_machine->connecting = 0;
402                 return;
403         }
404
405
406         if( strcmp( (char*) name, "error" ) == 0 ) {
407                 ses->state_machine->in_message_error = 1;
408                 buffer_add( ses->message_error_type, get_xml_attr( atts, "type" ) );
409                 ses->message_error_code = atoi( get_xml_attr( atts, "code" ) );
410                 osrfLogInfo( OSRF_LOG_MARK,  "Received <error> message with type %s and code %s", 
411                         get_xml_attr( atts, "type"), get_xml_attr( atts, "code") );
412                 return;
413         }
414
415         if( strcmp( (char*) name, "iq" ) == 0 ) {
416                 ses->state_machine->in_iq = 1;
417
418                 if( strcmp( get_xml_attr(atts, "type"), "result") == 0 
419                                 && ses->state_machine->connecting == CONNECTING_2 ) {
420                         ses->state_machine->connected = 1;
421                         ses->state_machine->connecting = 0;
422                         return;
423                 }
424
425                 if( strcmp( get_xml_attr(atts, "type"), "error") == 0 ) {
426                         osrfLogWarning( OSRF_LOG_MARK,  "Error connecting to jabber" );
427                         return;
428                 }
429         }
430 }
431
432 char* get_xml_attr( const xmlChar** atts, char* attr_name ) {
433         int i;
434         if (atts != NULL) {
435                 for(i = 0;(atts[i] != NULL);i++) {
436                         if( strcmp( (char*) atts[i++], attr_name ) == 0 ) {
437                                 if( atts[i] != NULL ) {
438                                         return (char*) atts[i];
439                                 }
440                         }
441                 }
442         }
443         return NULL;
444 }
445
446
447 // ------------------------------------------------------------------
448 // See which tags are ending
449 // ------------------------------------------------------------------
450 void endElementHandler( void *session, const xmlChar *name) {
451         transport_session* ses = (transport_session*) session;
452         if( ! ses ) { return; }
453
454         if( strcmp( (char*) name, "message" ) == 0 ) {
455
456
457                 /* pass off the message info the callback */
458                 if( ses->message_callback ) {
459
460                         /* here it's ok to pass in the raw buffers because
461                                 message_init allocates new space for the chars 
462                                 passed in */
463                         transport_message* msg =  message_init( 
464                                 ses->body_buffer->buf, 
465                                 ses->subject_buffer->buf,
466                                 ses->thread_buffer->buf, 
467                                 ses->recipient_buffer->buf, 
468                                 ses->from_buffer->buf );
469
470                         message_set_router_info( msg, 
471                                 ses->router_from_buffer->buf, 
472                                 ses->router_to_buffer->buf, 
473                                 ses->router_class_buffer->buf,
474                                 ses->router_command_buffer->buf,
475                                 ses->router_broadcast );
476
477          message_set_osrf_xid( msg, ses->osrf_xid_buffer->buf );
478
479                         if( ses->message_error_type->n_used > 0 ) {
480                                 set_msg_error( msg, ses->message_error_type->buf, ses->message_error_code );
481                         }
482
483                         if( msg == NULL ) { return; }
484                         ses->message_callback( ses->user_data, msg );
485                 }
486
487                 ses->state_machine->in_message = 0;
488                 reset_session_buffers( session );
489                 return;
490         }
491         
492         if( strcmp( (char*) name, "body" ) == 0 ) {
493                 ses->state_machine->in_message_body = 0;
494                 return;
495         }
496
497         if( strcmp( (char*) name, "subject" ) == 0 ) {
498                 ses->state_machine->in_subject = 0;
499                 return;
500         }
501
502         if( strcmp( (char*) name, "thread" ) == 0 ) {
503                 ses->state_machine->in_thread = 0;
504                 return;
505         }
506         
507         if( strcmp( (char*) name, "iq" ) == 0 ) {
508                 ses->state_machine->in_iq = 0;
509                 if( ses->message_error_code > 0 ) {
510                         osrfLogWarning( OSRF_LOG_MARK,  "Error in IQ packet: code %d",  ses->message_error_code );
511                         osrfLogWarning( OSRF_LOG_MARK,  "Error 401 means not authorized" );
512                 }
513                 reset_session_buffers( session );
514                 return;
515         }
516
517         if( strcmp( (char*) name, "presence" ) == 0 ) {
518                 ses->state_machine->in_presence = 0;
519                 /*
520                 if( ses->presence_callback ) {
521                         // call the callback with the status, etc.
522                 }
523                 */
524                 reset_session_buffers( session );
525                 return;
526         }
527
528         if( strcmp( (char*) name, "status" ) == 0 ) {
529                 ses->state_machine->in_status = 0;
530                 return;
531         }
532
533         if( strcmp( (char*) name, "error" ) == 0 ) {
534                 ses->state_machine->in_message_error = 0;
535                 return;
536         }
537
538         if( strcmp( (char*) name, "error:error" ) == 0 ) {
539                 ses->state_machine->in_error = 0;
540                 return;
541         }
542 }
543
544 int reset_session_buffers( transport_session* ses ) {
545         buffer_reset( ses->body_buffer );
546         buffer_reset( ses->subject_buffer );
547         buffer_reset( ses->thread_buffer );
548         buffer_reset( ses->from_buffer );
549         buffer_reset( ses->recipient_buffer );
550         buffer_reset( ses->router_from_buffer );
551         buffer_reset( ses->osrf_xid_buffer );
552         buffer_reset( ses->router_to_buffer );
553         buffer_reset( ses->router_class_buffer );
554         buffer_reset( ses->router_command_buffer );
555         buffer_reset( ses->message_error_type );
556         buffer_reset( ses->session_id );
557
558         return 1;
559 }
560
561 // ------------------------------------------------------------------
562 // takes data out of the body of the message and pushes it into
563 // the appropriate buffer
564 // ------------------------------------------------------------------
565 void characterHandler(
566                 void *session, const xmlChar *ch, int len) {
567
568         char data[len+1];
569         memset( data, 0, len+1 );
570         strncpy( data, (char*) ch, len );
571         data[len] = 0;
572
573         //printf( "Handling characters: %s\n", data );
574         transport_session* ses = (transport_session*) session;
575         if( ! ses ) { return; }
576
577         /* set the various message parts */
578         if( ses->state_machine->in_message ) {
579
580                 if( ses->state_machine->in_message_body ) {
581                         buffer_add( ses->body_buffer, data );
582                 }
583
584                 if( ses->state_machine->in_subject ) {
585                         buffer_add( ses->subject_buffer, data );
586                 }
587
588                 if( ses->state_machine->in_thread ) {
589                         buffer_add( ses->thread_buffer, data );
590                 }
591         }
592
593         /* set the presence status */
594         if( ses->state_machine->in_presence && ses->state_machine->in_status ) {
595                 buffer_add( ses->status_buffer, data );
596         }
597
598         if( ses->state_machine->in_error ) {
599                 /* for now... */
600                 osrfLogWarning( OSRF_LOG_MARK,  "ERROR Xml fragment: %s\n", ch );
601         }
602
603 }
604
605 /* XXX change to warning handlers */
606 void  parseWarningHandler( void *session, const char* msg, ... ) {
607
608         va_list args;
609         va_start(args, msg);
610         fprintf(stdout, "transport_session XML WARNING");
611         vfprintf(stdout, msg, args);
612         va_end(args);
613         fprintf(stderr, "XML WARNING: %s\n", msg ); 
614 }
615
616 void  parseErrorHandler( void *session, const char* msg, ... ){
617
618         va_list args;
619         va_start(args, msg);
620         fprintf(stdout, "transport_session XML ERROR");
621         vfprintf(stdout, msg, args);
622         va_end(args);
623         fprintf(stderr, "XML ERROR: %s\n", msg ); 
624
625 }
626
627 int session_disconnect( transport_session* session ) {
628         if( session == NULL ) { return 0; }
629         //tcp_send( session->sock_obj, "</stream:stream>");
630         socket_send(session->sock_id, "</stream:stream>");
631         socket_disconnect(session->sock_mgr, session->sock_id);
632         return 0;
633         //return tcp_disconnect( session->sock_obj );
634 }
635