/* * OpenMRCP - Open Source Media Resource Control Protocol Stack * Copyright (C) 2007, Cepstral LLC * * Version: MPL 1.1 * * The contents of this file are subject to the Mozilla Public License Version * 1.1 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * http://www.mozilla.org/MPL/ * * Software distributed under the License is distributed on an "AS IS" basis, * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License * for the specific language governing rights and limitations under the * License. * * Author(s): * Arsen Chaloyan * * Contributor(s): * */ #include #include #include "mrcp_server_proto_agent.h" #include "apt_producer_task.h" #include "mrcp_parser.h" #define MRCP_CLIENT_MAX_COUNT 10 typedef struct mrcp_v2_connection_t mrcp_v2_connection_t; struct mrcp_v2_connection_t { mrcp_connection_t *base; apr_pool_t *pool; apr_socket_t *sock; /* accepted socket */ apr_pollfd_t sock_pfd; char *remote_ip; size_t access_count; mrcp_v2_connection_t *next; mrcp_v2_connection_t *prev; }; typedef struct mrcp_v2_agent_t mrcp_v2_agent_t; struct mrcp_v2_agent_t { mrcp_proto_agent_t proto_agent; apt_producer_task_t *producer_task; apr_pool_t *pool; apr_pollset_t *pollset; apr_sockaddr_t *sockaddr; apr_socket_t *listen_sock; /* listening socket */ apr_pollfd_t listen_sock_pfd; mrcp_v2_connection_t *connection_head; mrcp_v2_connection_t *connection_tail; }; typedef enum { PROTO_AGENT_EVENT_PROCESS_CONNECTION, PROTO_AGENT_EVENT_ACCEPT_CONNECTION, PROTO_AGENT_EVENT_STARTED, PROTO_AGENT_EVENT_TERMINATED } mrcp_proto_agent_event_id; typedef struct mrcp_proto_agent_event_t mrcp_proto_agent_event_t; struct mrcp_proto_agent_event_t { mrcp_proto_agent_event_id id; mrcp_v2_connection_t *connection; }; static mrcp_status_t mrcp_server_agent_destroy(mrcp_module_t *module); static mrcp_module_state_t mrcp_server_agent_open(mrcp_module_t *module); static mrcp_module_state_t mrcp_server_agent_close(mrcp_module_t *module); static mrcp_status_t mrcp_server_agent_signal_handler(mrcp_module_t *module, apt_task_msg_t *msg); static const mrcp_module_method_set_t module_method_set = { mrcp_server_agent_destroy, mrcp_server_agent_open, mrcp_server_agent_close, mrcp_server_agent_signal_handler }; static mrcp_connection_t* mrcp_server_agent_connection_add(mrcp_proto_agent_t *proto_agent, mrcp_signaling_channel_t *channel, const char *remote_ip); static const mrcp_proto_agent_method_set_t agent_method_set = { mrcp_server_agent_connection_add, }; static mrcp_status_t mrcp_server_agent_connection_remove(mrcp_connection_t *base); static mrcp_status_t mrcp_server_agent_message_send(mrcp_connection_t *base, mrcp_message_t *mrcp_message); static const mrcp_connection_method_set_t connection_method_set = { mrcp_server_agent_connection_remove, mrcp_server_agent_message_send }; static mrcp_v2_connection_t* mrcp_server_agent_connection_find(mrcp_v2_agent_t *agent, const char *remote_ip); static mrcp_status_t mrcp_server_agent_socket_create(mrcp_v2_agent_t *agent) { apr_status_t status; agent->connection_head = NULL; agent->connection_tail = NULL; if(!agent->sockaddr) { return MRCP_STATUS_FAILURE; } status = apr_socket_create(&agent->listen_sock, agent->sockaddr->family, SOCK_STREAM, APR_PROTO_TCP, agent->pool); if(status != APR_SUCCESS) { return MRCP_STATUS_FAILURE; } apr_socket_opt_set(agent->listen_sock, APR_SO_NONBLOCK, 0); apr_socket_timeout_set(agent->listen_sock, -1); apr_socket_opt_set(agent->listen_sock, APR_SO_REUSEADDR, 1); status = apr_socket_bind(agent->listen_sock, agent->sockaddr); if(status != APR_SUCCESS) { apr_socket_close(agent->listen_sock); agent->listen_sock = NULL; return MRCP_STATUS_FAILURE; } status = apr_socket_listen(agent->listen_sock, SOMAXCONN); if(status != APR_SUCCESS) { apr_socket_close(agent->listen_sock); agent->listen_sock = NULL; return MRCP_STATUS_FAILURE; } status = apr_pollset_create(&agent->pollset, MRCP_CLIENT_MAX_COUNT + 2, agent->pool, 0); if(status != APR_SUCCESS) { apr_socket_close(agent->listen_sock); agent->listen_sock = NULL; return MRCP_STATUS_FAILURE; } agent->listen_sock_pfd.desc_type = APR_POLL_SOCKET; agent->listen_sock_pfd.reqevents = APR_POLLIN; agent->listen_sock_pfd.desc.s = agent->listen_sock; agent->listen_sock_pfd.client_data = agent->listen_sock; status = apr_pollset_add(agent->pollset, &agent->listen_sock_pfd); if(status != APR_SUCCESS) { apr_socket_close(agent->listen_sock); agent->listen_sock = NULL; apr_pollset_destroy(agent->pollset); agent->pollset = NULL; return MRCP_STATUS_FAILURE; } return MRCP_STATUS_SUCCESS; } static mrcp_status_t mrcp_server_agent_socket_destroy(mrcp_v2_agent_t *agent) { mrcp_v2_connection_t *connection = agent->connection_head; for(; connection; connection = agent->connection_head) { if(connection->sock) { apr_pollset_remove(agent->pollset,&connection->sock_pfd); apr_socket_close(connection->sock); } agent->connection_head = connection->next; apr_pool_destroy(connection->pool); } agent->connection_head = NULL; agent->connection_tail = NULL; if(agent->listen_sock) { apr_pollset_remove(agent->pollset,&agent->listen_sock_pfd); apr_socket_close(agent->listen_sock); agent->listen_sock = NULL; } if(agent->pollset) { apr_pollset_destroy(agent->pollset); agent->pollset = NULL; } return MRCP_STATUS_SUCCESS; } static mrcp_status_t mrcp_server_agent_connection_accept(mrcp_v2_agent_t *agent) { apr_socket_t *sock; apr_sockaddr_t *sockaddr; mrcp_v2_connection_t *connection; char *remote_ip; apr_status_t status = apr_socket_accept(&sock, agent->listen_sock, agent->pool); if(status != APR_SUCCESS) { return MRCP_STATUS_FAILURE; } apr_socket_addr_get(&sockaddr,APR_REMOTE,sock); apr_sockaddr_ip_get(&remote_ip,sockaddr); connection = mrcp_server_agent_connection_find(agent,remote_ip); if(connection && !connection->sock) { apt_log(APT_PRIO_NOTICE,"MRCPv2 Client Connected\n"); connection->sock = sock; connection->sock_pfd.desc_type = APR_POLL_SOCKET; connection->sock_pfd.reqevents = APR_POLLIN; connection->sock_pfd.desc.s = connection->sock; connection->sock_pfd.client_data = connection; apr_pollset_add(agent->pollset, &connection->sock_pfd); } else { apt_log(APT_PRIO_INFO,"MRCPv2 Client Rejected\n"); apr_socket_close(sock); } return MRCP_STATUS_SUCCESS; } static void mrcp_server_agent_on_start(void *data) { apt_task_msg_t *task_msg; mrcp_v2_agent_t *agent = data; task_msg = apt_producer_task_msg_get(agent->producer_task); if(task_msg) { mrcp_proto_agent_event_t *agent_event = (mrcp_proto_agent_event_t*)task_msg->data; agent_event->id = PROTO_AGENT_EVENT_STARTED; agent->proto_agent.module.signal(&agent->proto_agent.module,task_msg); } } static void mrcp_server_agent_on_terminate_request(void *data) { mrcp_v2_agent_t *agent = data; if(agent->listen_sock) { apr_socket_t *tsock; /* terminating socket */ if(apr_socket_create(&tsock, agent->sockaddr->family, SOCK_STREAM, APR_PROTO_TCP, agent->pool) == APR_SUCCESS) { apr_socket_connect(tsock, agent->sockaddr); apr_socket_close(tsock); } } } static void mrcp_server_agent_on_terminate_complete(void *data) { apt_task_msg_t *task_msg; mrcp_v2_agent_t *agent = data; task_msg = apt_producer_task_msg_get(agent->producer_task); if(task_msg) { mrcp_proto_agent_event_t *agent_event = (mrcp_proto_agent_event_t*)task_msg->data; agent_event->id = PROTO_AGENT_EVENT_TERMINATED; agent->proto_agent.module.signal(&agent->proto_agent.module,task_msg); } } static void mrcp_server_agent_main_loop(void *data) { mrcp_v2_agent_t *agent = data; apt_task_msg_t *task_msg; apr_status_t status; apr_int32_t num; const apr_pollfd_t *ret_pfd; int i; if(!agent || !agent->pollset) { apt_log(APT_PRIO_WARNING,"Failed to Start MRCPv2 Agent\n"); return; } while(apt_producer_task_is_terminating(agent->producer_task) != TRUE) { status = apr_pollset_poll(agent->pollset, -1, &num, &ret_pfd); if(status != APR_SUCCESS) { continue; } for(i = 0; i < num; i++) { if(ret_pfd[i].desc.s == agent->listen_sock) { if(apt_producer_task_is_terminating(agent->producer_task) == TRUE) { break; } apt_log(APT_PRIO_DEBUG,"Signal MRCPv2 Connection Accept\n"); task_msg = apt_producer_task_msg_get(agent->producer_task); if(task_msg) { mrcp_proto_agent_event_t *agent_event = (mrcp_proto_agent_event_t*)task_msg->data; agent_event->id = PROTO_AGENT_EVENT_ACCEPT_CONNECTION; agent->proto_agent.module.signal(&agent->proto_agent.module,task_msg); } continue; } apt_log(APT_PRIO_DEBUG,"Signal MRCPv2 Connection Process\n"); task_msg = apt_producer_task_msg_get(agent->producer_task); if(task_msg) { mrcp_proto_agent_event_t *agent_event = (mrcp_proto_agent_event_t*)task_msg->data; agent_event->id = PROTO_AGENT_EVENT_PROCESS_CONNECTION; agent_event->connection = ret_pfd[i].client_data; agent->proto_agent.module.signal(&agent->proto_agent.module,task_msg); } } } } static mrcp_status_t mrcp_server_agent_message_send(mrcp_connection_t *base, mrcp_message_t *mrcp_message) { char buffer[MRCP_MESSAGE_MAX_SIZE]; apt_text_stream_t text_stream; size_t size = sizeof(buffer)-1; mrcp_v2_connection_t *connection = base->object; if(!connection || !connection->sock) { return MRCP_STATUS_FAILURE; } text_stream.buffer = buffer; text_stream.size = size; text_stream.pos = text_stream.buffer; mrcp_message->start_line.version = MRCP_VERSION_2; if(mrcp_message_generate(base->agent->resource_container,mrcp_message,&text_stream) != MRCP_STATUS_SUCCESS) { apt_log(APT_PRIO_WARNING,"Failed to Generate MRCPv2 Message\n"); return MRCP_STATUS_FAILURE; } *text_stream.pos = '\0'; apt_log(APT_PRIO_DEBUG,"Send MRCPv2 Message size=%lu\n%s\n",text_stream.size,text_stream.buffer); apr_socket_send(connection->sock,text_stream.buffer,&text_stream.size); return MRCP_STATUS_SUCCESS; } static mrcp_status_t mrcp_server_agent_messsage_receive(mrcp_v2_agent_t *agent, mrcp_v2_connection_t *connection) { char buffer[MRCP_MESSAGE_MAX_SIZE]; int more_messages_on_buffer = FALSE; apr_size_t size = sizeof(buffer)-1; apr_status_t status; apt_text_stream_t text_stream; mrcp_message_t *mrcp_message; if(!connection || !connection->sock) { return MRCP_STATUS_FAILURE; } status = apr_socket_recv(connection->sock, buffer, &size); if(status == APR_EOF || size == 0) { apt_log(APT_PRIO_NOTICE,"MRCPv2 Client Disconnected\n"); apr_pollset_remove(agent->pollset,&connection->sock_pfd); apr_socket_close(connection->sock); connection->sock = NULL; if(!connection->access_count) { apr_pool_destroy(connection->pool); } return MRCP_STATUS_SUCCESS; } buffer[size] = '\0'; text_stream.buffer = buffer; text_stream.size = size; text_stream.pos = buffer; apt_log(APT_PRIO_DEBUG,"Receive MRCPv2 Message size=%lu\n%s\n",text_stream.size,text_stream.buffer); if(!connection->access_count) { return MRCP_STATUS_FAILURE; } do { mrcp_message = mrcp_message_create(connection->pool); if(mrcp_message_parse(agent->proto_agent.resource_container,mrcp_message,&text_stream) == MRCP_STATUS_SUCCESS) { apt_log(APT_PRIO_DEBUG,"Signal MRCPv2 Message\n"); connection->base->event_set->on_receive(connection->base,mrcp_message); } else { mrcp_message_t response_message; apt_log(APT_PRIO_WARNING,"Failed to Parse MRCPv2 Message\n"); if(mrcp_message->start_line.version == MRCP_VERSION_2) { /* assume that at least message length field is valid */ if(mrcp_message->start_line.length <= text_stream.size) { /* skip to the end of the message */ text_stream.pos = text_stream.buffer + mrcp_message->start_line.length; } else { /* skip to the end of the buffer (support incomplete) */ text_stream.pos = text_stream.buffer + text_stream.size; } } mrcp_message_init(&response_message,connection->pool); mrcp_response_init_by_request(&response_message,mrcp_message); response_message.start_line.status_code = MRCP_STATUS_CODE_UNRECOGNIZED_MESSAGE; if(mrcp_server_agent_message_send(connection->base,&response_message) != MRCP_STATUS_SUCCESS) { apt_log(APT_PRIO_WARNING,"Failed to Send MRCPv2 Response\n"); } } more_messages_on_buffer = FALSE; if(text_stream.size > (size_t)(text_stream.pos - text_stream.buffer)) { /* there are more MRCPv2 messages to signal */ more_messages_on_buffer = TRUE; text_stream.size -= text_stream.pos - text_stream.buffer; text_stream.buffer = text_stream.pos; apt_log(APT_PRIO_DEBUG,"Saving remaining buffer for next message...\n"); } } while(more_messages_on_buffer); return MRCP_STATUS_SUCCESS; } static APR_INLINE mrcp_v2_agent_t* mrcp_v2_agent_get(mrcp_module_t *module) { return ((mrcp_proto_agent_t*)module)->object; } static mrcp_status_t mrcp_server_agent_signal_handler(mrcp_module_t *module, apt_task_msg_t *msg) { mrcp_v2_agent_t *agent = mrcp_v2_agent_get(module); mrcp_proto_agent_event_t *agent_event = (mrcp_proto_agent_event_t*)msg->data; if(!agent || !agent_event) { return MRCP_STATUS_FAILURE; } switch(agent_event->id) { case PROTO_AGENT_EVENT_PROCESS_CONNECTION: mrcp_server_agent_messsage_receive(agent,agent_event->connection); break; case PROTO_AGENT_EVENT_ACCEPT_CONNECTION: mrcp_server_agent_connection_accept(agent); break; case PROTO_AGENT_EVENT_STARTED: if(module->event_set->on_open) { module->event_set->on_open(module); } break; case PROTO_AGENT_EVENT_TERMINATED: mrcp_server_agent_socket_destroy(agent); if(module->event_set->on_close) { module->event_set->on_close(module); } break; } return MRCP_STATUS_SUCCESS; } static mrcp_v2_connection_t* mrcp_server_agent_connection_find(mrcp_v2_agent_t *agent, const char *remote_ip) { mrcp_v2_connection_t *connection = agent->connection_head; for(; connection; connection = connection->next) { if(strcmp(connection->remote_ip,remote_ip) == 0) { return connection; } } return NULL; } static mrcp_connection_t* mrcp_server_agent_connection_add(mrcp_proto_agent_t *proto_agent, mrcp_signaling_channel_t *channel, const char *remote_ip) { mrcp_v2_agent_t *agent = proto_agent->object; mrcp_v2_connection_t *connection; if(!remote_ip) { return NULL; } connection = mrcp_server_agent_connection_find(agent,remote_ip); if(!connection) { mrcp_connection_t *base; apr_pool_t *pool; if(apr_pool_create(&pool,NULL) != APR_SUCCESS) { return NULL; } base = apr_palloc(pool,sizeof(mrcp_connection_t)); base->agent = proto_agent; base->method_set = &connection_method_set; base->event_set = proto_agent->connection_event_set; connection = apr_palloc(pool,sizeof(mrcp_v2_connection_t)); connection->pool = pool; connection->base = base; base->object = connection; connection->remote_ip = apr_pstrdup(pool,remote_ip); connection->access_count = 0; connection->sock = NULL; connection->next = NULL; connection->prev = agent->connection_tail; if(agent->connection_tail) { agent->connection_tail->next = connection; agent->connection_tail = connection; } else { agent->connection_head = agent->connection_tail = connection; } } connection->access_count++; return connection->base; } static mrcp_status_t mrcp_server_agent_connection_remove(mrcp_connection_t *base) { mrcp_v2_agent_t *agent = base->agent->object; mrcp_v2_connection_t *connection = base->object; if(!connection || !connection->access_count) { return MRCP_STATUS_FAILURE; } connection->access_count--; if(!connection->access_count) { if(connection->prev) { connection->prev->next = connection->next; } else { agent->connection_head = connection->next; } if(connection->next) { connection->next->prev = connection->prev; } else { agent->connection_tail = connection->prev; } connection->next = NULL; connection->prev = NULL; if(!connection->sock) { apr_pool_destroy(connection->pool); } } return MRCP_STATUS_SUCCESS; } static mrcp_module_state_t mrcp_server_agent_open(mrcp_module_t *module) { mrcp_v2_agent_t *agent = mrcp_v2_agent_get(module); apt_log(APT_PRIO_INFO,"Open MRCPv2 Agent\n"); if(mrcp_server_agent_socket_create(agent) != MRCP_STATUS_SUCCESS) { apt_log(APT_PRIO_WARNING,"Failed to Open MRCPv2 Agent\n"); return MODULE_STATE_NONE; } apt_producer_task_start(agent->producer_task); return MODULE_STATE_OPEN_INPROGRESS; } static mrcp_module_state_t mrcp_server_agent_close(mrcp_module_t *module) { mrcp_v2_agent_t *agent = mrcp_v2_agent_get(module); apt_log(APT_PRIO_INFO,"Close MRCPv2 Agent\n"); apt_producer_task_terminate(agent->producer_task,FALSE); return MODULE_STATE_CLOSE_INPROGRESS; } static mrcp_status_t mrcp_server_agent_destroy(mrcp_module_t *module) { mrcp_v2_agent_t *agent = mrcp_v2_agent_get(module); apt_log(APT_PRIO_NOTICE,"Destroy MRCPv2 Agent\n"); if(agent->producer_task) { apt_producer_task_destroy(agent->producer_task); } agent->pool = NULL; return MRCP_STATUS_SUCCESS; } mrcp_proto_agent_t* mrcp_server_v2_agent_create(const char *listen_ip, unsigned short listen_port, apr_pool_t *pool) { mrcp_v2_agent_t *agent; apt_task_msg_pool_t *msg_pool; apt_log(APT_PRIO_NOTICE,"Create MRCPv2 Agent TCP %s:%hu\n",listen_ip,listen_port); agent = apr_palloc(pool,sizeof(mrcp_v2_agent_t)); agent->pool = pool; agent->listen_sock = NULL; agent->pollset = NULL; agent->sockaddr = NULL; agent->proto_agent.object = agent; agent->proto_agent.agent_method_set = &agent_method_set; apr_sockaddr_info_get(&agent->sockaddr,listen_ip,APR_INET,listen_port,0,agent->pool); if(!agent->sockaddr) { return NULL; } mrcp_module_init(&agent->proto_agent.module,&module_method_set); msg_pool = apt_task_msg_pool_create_waitable_static(sizeof(mrcp_proto_agent_event_t),pool); agent->producer_task = apt_producer_task_create(agent,mrcp_server_agent_main_loop,msg_pool,pool); if(agent->producer_task) { apt_task_t *task = apt_producer_task_get(agent->producer_task); apt_task_event_handler_set(task,TASK_STATE_START_IN_PROGRESS,agent,mrcp_server_agent_on_start); apt_task_event_handler_set(task,TASK_STATE_TERMINATE_REQUESTED,agent,mrcp_server_agent_on_terminate_request); apt_task_event_handler_set(task,TASK_STATE_TERMINATE_COMPLETED,agent,mrcp_server_agent_on_terminate_complete); } return &agent->proto_agent; }