smtsock.c


/*  ----------------------------------------------------------------<Prolog>-
    Name:       smtsock.c
    Title:      SMT socket i/o agent
    Package:    Libero/SMT Kernel 2.x

    Written:    1996/06/16  iMatix SMT kernel team smt@imatix.com
    Revised:    1999/06/19

    Synopsis:   Waits, reads, writes socket data.

    Copyright:  Copyright (c) 1991-2000 iMatix Corporation
    License:    This is free software; you can redistribute it and/or modify
                it under the terms of the SMT License Agreement as provided
                in the file LICENSE.TXT.  This software is distributed in
                the hope that it will be useful, but without any warranty.
 ------------------------------------------------------------------</Prolog>-*/

#include "smtdefn.h"                    /*  SMT definitions                  */


/*- Definitions -------------------------------------------------------------*/

#define AGENT_NAME      SMT_SOCKET      /*  Our public name                  */
#define SINGLE_THREADED TRUE            /*  Single-threaded agent            */

typedef struct _SOCKREQ {               /*  Request descriptor               */
    struct _SOCKREQ                     /*                                   */
           *next, *prev;                /*    Doubly-linked list             */
    QID     reply_to;                   /*    Who sent the request event     */
    sock_t  input;                      /*    Socket for input               */
    sock_t  output;                     /*    Socket for output              */
    byte   *buffer;                     /*    Buffer for i/o, or NULL        */
    qbyte   max_size;                   /*    Maximum size of buffer         */
    qbyte   cur_size;                   /*    Current size of buffer         */
    qbyte   min_size;                   /*    Minimum data to process        */
    dbyte   timeout;                    /*    Expiry time in seconds         */
    time_t  expires;                    /*    Expiry time, or 0              */
    qbyte   tag;                        /*    User-defined request tag       */
    Bool    repeat;                     /*    Repeated request?              */
    Bool    huge_block;                 /*    Huge blocks?                   */
} SOCKREQ;


/*- Function prototypes -----------------------------------------------------*/

static SOCKREQ *request_create    (THREAD *thread,
                                   dbyte timeout, sock_t handle, qbyte tag);
static sock_t   request_handle    (SOCKREQ *request);
static void     request_destroy   (SOCKREQ *request);
static void     handle_partial_io (SOCKREQ *request, int bytes_done);
static void     reply_error       (QID *qid, sock_t handle, char *message,
                                   qbyte tag);
static void     reply_normal      (SOCKREQ *request, char *event_name);
static void     purge_old_request (sock_t input, sock_t output);


/*- Global variables used in this source file only --------------------------*/

static Bool
    had_activity = FALSE,
    trace_flag = FALSE;                 /*  Trace socket activity?           */
static NODE
    requests;                           /*  Request list header              */
static fd_set
    read_set,                           /*  Sockets to check for input       */
    write_set,                          /*  Sockets to check for output      */
    error_set;                          /*  Sockets to check for errors      */
static QID
    operq;                              /*  Operator console event queue     */
static SOCKREQ
    *request,                           /*  Pointer to request (in list)     */
    *active_request;                    /*  Request we're processing         */

static byte
    msg_body [LINE_MAX];                /*  Message sent to requestors       */
static int
    msg_size;                           /*  Size of formatted msg_body       */
static DESCR                            /*  Descriptor for exdr_write        */
    msg = { LINE_MAX, msg_body };


#include "smtsock.d"                    /*  Include dialog data              */

/********************   INITIALISE AGENT - ENTRY POINT   *********************/

/*  ---------------------------------------------------------------------[<]-
    Function: smtsock_init

    Synopsis: Initialises the SMT socket agent.  Returns 0 if initialised
    okay, -1 if there was an error.  The socket agent manages all sockets
    (TCP and UPD) used by an SMT application.  Creates an unnamed thread
    automatically: send events to that thread.  Initialises the sflsock
    socket interface automatically.  Supports these public methods:
    <Table>
    READ      Read a specified amount of input data (use SMT_SOCK_READ).
    WRITE     Write a specified amount of output data (use SMT_SOCK_WRITE).
    READR     Read input data, repeatedly (use SMT_SOCK_READ).
    READH     As for READ, but for blocks > 64k (use SMT_SOCK_READH).
    WRITEH    As for WRITE, but for blocks > 64k (use SMT_SOCK_WRITEH).
    READRH    As for READR, but for blocks > 64k (use SMT_SOCK_READH).
    INPUT     Wait for any input ready on socket (use SMT_SOCK_INPUT).
    INPUTR    Wait for any input, repeatedly (use SMT_SOCK_INPUT).
    OUTPUT    Wait for any output ready on socket (use SMT_SOCK_OUTPUT).
    CONNECT   Make socket connection to host & port (use SMT_SOCK_CONNECT).
    FLUSH     Delete all requests for specified socket (use SMT_SOCK_FLUSH).
    </Table>
    Sends errors to the SMTOPER agent; see doc for reply events.
    ---------------------------------------------------------------------[>]-*/

int
smtsock_init (void)
{
    AGENT   *agent;                     /*  Handle for our agent             */
    THREAD  *thread;                    /*  Handle to console thread         */
#   include "smtsock.i"                 /*  Include dialog interpreter       */

    /*  We give this agent a low priority, so that it will only run after    */
    /*  all other threads.  This is important, since it blocks on select().  */
    agent-> priority = SMT_PRIORITY_LOW;

    /*                      Method name     Event value      Priority        */
    /*  Shutdown event comes from Kernel                                     */
    method_declare (agent, "SHUTDOWN",      shutdown_event,  SMT_PRIORITY_MAX);

    /*  Public methods supported by this agent                               */
    method_declare (agent, "READ",          read_event,      0);
    method_declare (agent, "READR",         readr_event,     0);
    method_declare (agent, "READH",         readh_event,     0);
    method_declare (agent, "READRH",        readrh_event,    0);
    method_declare (agent, "WRITE",         write_event,     0);
    method_declare (agent, "WRITEH",        writeh_event,    0);
    method_declare (agent, "INPUT",         input_event,     0);
    method_declare (agent, "INPUTR",        inputr_event,    0);
    method_declare (agent, "OUTPUT",        output_event,    0);
    method_declare (agent, "CONNECT",       connect_event,   0);
    method_declare (agent, "FLUSH",         flush_event,     0);

    /*  Private method used to cycle on select() call                        */
    method_declare (agent, "_TIMEOUT",      timeout_event,   0);

    /*  Ensure that operator console is running, else start it up            */
    if (agent_lookup (SMT_OPERATOR) == NULL)
        smtoper_init ();
    if ((thread = thread_lookup (SMT_OPERATOR, "")) != NULL)
        operq = thread-> queue-> qid;
    else
        return (-1);

    /*  Initialise the socket interface and register sock_term()             */
    if (sock_init () == 0)
        smt_atexit ((function) sock_term);
    else
      {
        sendfmt (&operq, "ERROR",
                 "smtsock: could not initialise socket interface");
        sendfmt (&operq, "ERROR",
                 "smtsock: %s", connect_errlist [connect_error ()]);
        return (-1);
      }

    ip_nonblock = TRUE;                  /*  Want nonblocking sockets        */

    /*  Create initial, unnamed thread                                       */
    thread_create (AGENT_NAME, "");

    /*  Signal okay to caller that we initialised okay                       */
    return (0);
}

/*  ---------------------------------------------------------------------[<]-
    Function: smtsock_trace

    Synopsis: Enables/disables socket tracing: to enable, call with TRUE as
    argument; to disable call with FALSE as argument.  Socket trace data is
    sent to the console.
    ---------------------------------------------------------------------[>]-*/

void smtsock_trace (Bool trace_value)
{
    trace_flag = trace_value;
}


/*************************   INITIALISE THE THREAD   *************************/

MODULE initialise_the_thread (THREAD *thread)
{
    node_reset (&requests);             /*  Initialise requests list         */
    the_next_event = ok_event;
}


/***********************   GET NEXT EVENT FROM QUEUE   ***********************/

MODULE get_next_event_from_queue (THREAD *thread)
{
    AGENT   *agent;                     /*  This agent                       */
    QUEUE   *queue;                     /*  Thread's event queue             */
    EVENT   *event;                     /*  Event information block          */
    METHOD  *method;                    /*  Method information block         */

    /*  Get next event off queue                                             */
    queue = thread-> queue;
    agent = queue-> agent;
    event = event_iterate (queue, NULL);
    if (event)
      {
        method = method_lookup (agent, event-> name);
        if (method == NULL)             /*  Not a method we accept           */
          {
            event_reject (queue, event);
            the_next_event = invalid_event;
          }
        else
          {
            if (thread-> event)         /*  If thread was sitting on an      */
              {                         /*    event, release it              */
                event_destroy (thread-> event);
                thread-> event = NULL;
              }
            /*  Get event off queue; it now belongs to the thread            */
            thread-> event = event_accept (queue, event);
            the_next_event = method-> event_number;
          }
      }
    else
        the_next_event = empty_event;
}


/**************************   CREATE READ REQUEST   **************************/

MODULE create_read_request (THREAD *thread)
{
    dbyte
        timeout,                        /*  Timeout, in seconds, or zero     */
        read_size,                      /*  Size of data buffer, bytes       */
        min_size;                       /*  Minimum amount of data to read   */
    sock_t
        handle;                         /*  Socket handle                    */
    qbyte
        tag;                            /*  User-defined request tag         */

    /*  Get arguments from message                                           */
    exdr_read (thread-> event-> body, SMT_SOCK_READ,
               &timeout, &handle, &read_size, &min_size, &tag);

    if (trace_flag)
        sendfmt (&operq, "INFO",
                 "smtsock: READ min=%d max=%d socket=%ld timeout=%d",
                  min_size, read_size, handle, timeout);

    if (read_size == 0 || handle == 0)
        reply_error (&thread-> event-> sender, handle,
                     "Null read request", tag);
    else
    if ((request = request_create (thread, timeout, handle, tag)) != NULL)
      {
        purge_old_request (handle, 0);
        request-> input    = handle;
        request-> max_size = read_size;
        request-> min_size = min_size? min_size: read_size;
        request-> buffer   = mem_alloc (read_size);
        if (request-> buffer == NULL)
            raise_exception (exception_event);
      }
}


/*  -------------------------------------------------------------------------
 *  request_create
 *
 *  Creates a new request, and initialises it to empty.  If the request
 *  could not be created, sends an SOCK_ERROR event to the caller, and
 *  returns null.  Otherwise returns the address of the created request.
 */

static SOCKREQ *
request_create (THREAD *thread, dbyte timeout, sock_t handle, qbyte tag)
{
    SOCKREQ
        *request;                       /*  Request we create                */

    if ((request = node_create (requests.prev, sizeof (SOCKREQ))) == NULL)
        reply_error (&thread-> event-> sender, handle, "Out of memory", tag);
    else
      {
        /*  Initialise the request with default values                       */
        request-> reply_to   = thread-> event-> sender;
        request-> input      = 0;
        request-> output     = 0;
        request-> buffer     = NULL;
        request-> max_size   = 0;
        request-> cur_size   = 0;
        request-> min_size   = 0;
        request-> tag        = tag;
        request-> repeat     = FALSE;
        request-> huge_block = FALSE;
        request-> timeout    = timeout;

        /*  It's really not correct ANSI C to compute with timevals; this    */
        /*  will just have to do for now.  It may break on weird systems.    */
        request-> expires = timeout? time (NULL) + timeout: 0;
      }
    had_activity = TRUE;
    return (request);
}


/*  -------------------------------------------------------------------------
 *  reply_error
 *
 *  Formats and sends a message containing the socket number and an error
 *  message.
 */

static void
reply_error (QID *qid, sock_t handle, char *message, qbyte tag)
{
    msg_size = exdr_writed (&msg, SMT_SOCK_ERROR, message, handle, tag);
    event_send (
        qid,                            /*  Send to specified queue          */
        NULL,                           /*  No queue for reply               */
        "SOCK_ERROR",                   /*  Name of event to send            */
        msg_body,                       /*  Event body contents              */
        msg_size,                       /*  Event body size                  */
        NULL, NULL, NULL,               /*  No response events               */
        0);                             /*  No timeout                       */
}


/*  -------------------------------------------------------------------------
 *  purge_old_request
 *
 *  Removes any existing requests for the specified input or output handles.
 *  Specify either or both handles, or zero.
 */

static void
purge_old_request (sock_t input, sock_t output)
{
    SOCKREQ
        *request;                       /*  Find request in list             */

    for (request = requests.next;
         request != (SOCKREQ *) &requests;
         request  = request-> next)
      {
        if ((input  && request-> input  == input)
        ||  (output && request-> output == output))
          {
            request = request-> prev;
            request_destroy (request-> next);
          }
      }
}


/*  -------------------------------------------------------------------------
 *  request_destroy
 *
 *  Destroys the specified request.
 */

static void
request_destroy (SOCKREQ *request)
{
    /*  Free dynamically-allocated fields in the request block, as reqd.     */
    mem_free (request-> buffer);
    node_destroy (request);
}


/***********************   CREATE READ REPEAT REQUEST   **********************/

MODULE create_read_repeat_request (THREAD *thread)
{
    create_read_request (thread);
    if (request)
        request-> repeat = TRUE;
}


/************************   CREATE HUGE READ REQUEST   ***********************/

MODULE create_huge_read_request (THREAD *thread)
{
    dbyte
        timeout;                        /*  Timeout, in seconds, or zero     */
    qbyte
        read_size,                      /*  Size of data buffer, bytes       */
        min_size,                       /*  Minimum amount of data to read   */
        tag;                            /*  User-defined request tag         */
    sock_t
        handle;                         /*  Socket handle                    */

    /*  Get arguments from message                                           */
    exdr_read (thread-> event-> body, SMT_SOCK_READH,
               &timeout, &handle, &read_size, &min_size, &tag);

    if (trace_flag)
        sendfmt (&operq, "INFO",
                 "smtsock: READH min=%ld max=%ld socket=%ld timeout=%d",
                  min_size, read_size, handle, timeout);

    if (read_size == 0 || handle == 0)
        reply_error (&thread-> event-> sender, handle,
                     "Null read request", tag);
    else
    if (read_size > UINT_MAX)
        reply_error (&thread-> event-> sender, handle,
                     "Read request too large for memory model", tag);
    else
    if ((request = request_create (thread, timeout, handle, tag)) != NULL)
      {
        purge_old_request (handle, 0);
        request-> input      = handle;
        request-> max_size   = read_size;
        request-> min_size   = min_size? min_size: read_size;
        request-> buffer     = mem_alloc ((size_t) read_size);
        request-> huge_block = TRUE;
        if (request-> buffer == NULL)
            raise_exception (exception_event);
      }
}


/********************   CREATE HUGE READ REPEAT REQUEST   ********************/

MODULE create_huge_read_repeat_request (THREAD *thread)
{
    create_huge_read_request (thread);
    if (request)
        request-> repeat = TRUE;
}


/**************************   CREATE WRITE REQUEST   *************************/

MODULE create_write_request (THREAD *thread)
{
    dbyte
        timeout,                        /*  Timeout, in seconds, or zero     */
        write_size;                     /*  Amount of data to write          */
    sock_t
        handle;                         /*  Socket handle                    */
    byte
        *buffer = NULL;                 /*  Buffer to write                  */
    qbyte
        tag;                            /*  User-defined request tag         */

    /*  Get arguments from message                                           */
    exdr_read (thread-> event-> body, SMT_SOCK_WRITE,
               &timeout, &handle, &write_size, &buffer, &tag);

    if (trace_flag)
        sendfmt (&operq, "INFO",
                 "smtsock: WRITE size=%d socket=%ld time=%d data=%x %x %x %x",
                  write_size, handle, timeout,
                  (byte) buffer [0], (byte) buffer [1],
                  (byte) buffer [2], (byte) buffer [3]);

    if (write_size == 0 || handle == 0)
      {
        mem_free (buffer);
        reply_error (&thread-> event-> sender, handle,
                     "Null write request", tag);
      }
    else
    if (write_size > UINT_MAX)
      {
        mem_free (buffer);
        reply_error (&thread-> event-> sender, handle,
                     "Write request too large for memory model", tag);
      }
    else
    if (buffer == NULL)                 /*  Not enough memory                */
        raise_exception (exception_event);
    else
    if ((request = request_create (thread, timeout, handle, tag)) != NULL)
      {
        request-> output   = handle;
        request-> max_size = write_size;
        request-> min_size = write_size;
        request-> buffer   = buffer;
      }
}


/***********************   CREATE HUGE WRITE REQUEST   ***********************/

MODULE create_huge_write_request (THREAD *thread)
{
    dbyte
        timeout;                        /*  Timeout, in seconds, or zero     */
    qbyte
        write_size,                     /*  Amount of data to write          */
        tag;                            /*  User-defined request tag         */
    sock_t
        handle;                         /*  Socket handle                    */
    byte
        *buffer = NULL;                 /*  Buffer to write                  */

    /*  Get arguments from message                                           */
    exdr_read (thread-> event-> body, SMT_SOCK_WRITEH,
               &timeout, &handle, &write_size, &buffer, &tag);

    if (trace_flag)
        sendfmt (&operq, "INFO",
                 "smtsock: WRITEH size=%ld socket=%ld time=%d data=%x%x%x%x",
                  write_size, handle, timeout,
                  (byte) buffer [0], (byte) buffer [1],
                  (byte) buffer [2], (byte) buffer [3]);

    if (write_size == 0 || handle == 0)
      {
        mem_free (buffer);
        reply_error (&thread-> event-> sender, handle,
                     "Null write request", tag);
      }
    else
    if (buffer == NULL)                 /*  Not enough memory                */
        raise_exception (exception_event);
    else
    if ((request = request_create (thread, timeout, handle, tag)) != NULL)
      {
        request-> output     = handle;
        request-> max_size   = write_size;
        request-> min_size   = write_size;
        request-> buffer     = buffer;
        request-> huge_block = TRUE;
      }
}


/**************************   CREATE INPUT REQUEST   *************************/

MODULE create_input_request (THREAD *thread)
{
    dbyte
        timeout;                        /*  Timeout, in seconds, or zero     */
    sock_t
        handle;                         /*  Socket handle                    */
    qbyte
        tag;                            /*  User-defined request tag         */

    /*  Get arguments from message                                           */
    exdr_read (thread-> event-> body, SMT_SOCK_INPUT,
               &timeout, &handle, &tag);

    if (trace_flag)
        sendfmt (&operq, "INFO",
                 "smtsock: INPUT socket=%ld timeout=%d", handle, timeout);

    if (handle == 0)
        reply_error (&thread-> event-> sender, handle,
                     "Null input request", tag);
    else
    if ((request = request_create (thread, timeout, handle, tag)) != NULL)
      {
        purge_old_request (handle, 0);
        request-> input = handle;
      }
}


/**********************   CREATE INPUT REPEAT REQUEST   **********************/

MODULE create_input_repeat_request (THREAD *thread)
{
    create_input_request (thread);
    if (request)
        request-> repeat = TRUE;
}


/*************************   CREATE OUTPUT REQUEST   *************************/

MODULE create_output_request (THREAD *thread)
{
    dbyte
        timeout;                        /*  Timeout, in seconds, or zero     */
    sock_t
        handle;                         /*  Socket handle                    */
    qbyte
        tag;                            /*  User-defined request tag         */

    /*  Get arguments from message                                           */
    exdr_read (thread-> event-> body, SMT_SOCK_OUTPUT,
               &timeout, &handle, &tag);

    if (trace_flag)
        sendfmt (&operq, "INFO",
                 "smtsock: OUTPUT socket=%ld timeout=%d", handle, timeout);

    if (handle == 0)
        reply_error (&thread-> event-> sender, handle,
                     "Null output request", tag);
    else
    if ((request = request_create (thread, timeout, handle, tag)) != NULL)
      {
        purge_old_request (0, handle);
        request-> output = handle;
      }
}


/*************************   CREATE CONNECT REQUEST   ************************/

MODULE create_connect_request (THREAD *thread)
{
    dbyte
        timeout,                        /*  Timeout, in seconds, or zero     */
        port_nbr;                       /*  Literal port number              */
    char
        *type = NULL,                   /*  Type of connection to make       */
        *host = NULL,                   /*  Host to connect to               */
        *service = NULL;                /*  Service or port number           */
    qbyte
        host_nbr;                       /*  Literal host number              */
    struct sockaddr_in
        host_addr;                      /*  Structure for connection         */
    sock_t
        handle;                         /*  Handle for connection            */
    qbyte
        tag;                            /*  User-defined request tag         */

    /*  Get arguments from message                                           */
    exdr_read (thread-> event-> body, SMT_SOCK_CONNECT,
               &timeout, &type, &host, &service, &port_nbr, &host_nbr, &tag);

    if (trace_flag)
        sendfmt (&operq, "INFO",
                 "smtsock: CONNECT type=%s to=%s/%s nbr=%lx/%d timeout=%d",
                  type, host, service, host_nbr, port_nbr, timeout);

    /*  Build socket address structure and connect to host.  Either of the   */
    /*  information pairs (host, service) (host_nbr, port_nbr) will be used  */
    /*  by the connect function.                                             */
    build_sockaddr (&host_addr, host_nbr, port_nbr);
    handle = connect_socket (host, service, type, &host_addr, 3, 0);

    /*  The connect call can fail, in which case we return the socket error  */
    /*  message.  If the call succeeds, we need to wait until the socket is  */
    /*  ready for use, since we use non-blocking sockets.  We generate a     */
    /*  write request; when this is true we'll send an ok event plus the     */
    /*  socket handle to the calling program.                                */

    if (handle == INVALID_SOCKET)
        reply_error (&thread-> event-> sender, 0, (char *) sockmsg (), tag);
    else
    if (handle > 0                      /*  Else wait until ready to write   */
    && (request = request_create (thread, timeout, handle, tag)) != NULL)
      {
        purge_old_request (0, handle);
        request-> output = handle;
      }
    mem_free (type);                    /*  Release allocated memory         */
    mem_free (host);
    mem_free (service);
}


/*************************   FLUSH SOCKET REQUESTS   *************************/

MODULE flush_socket_requests (THREAD *thread)
{
    sock_t
        handle;                         /*  Socket handle                    */

    exdr_read (thread-> event-> body, SMT_SOCK_FLUSH, &handle);
    purge_old_request (handle, handle);
}


/***********************   CHECK FOR EXPIRED REQUESTS   **********************/

MODULE check_for_expired_requests (THREAD *thread)
{
    time_t
        time_now;                       /*  Current time                     */

    time_now = time (NULL);
    for (request  = requests.next;
         request != (SOCKREQ *) &requests;
         request  = request-> next)
      {
        /*  If the request timed-out, reply SOCK_TIMEOUT and delete it       */
        if (request-> expires && request-> expires < time_now)
          {
            send_sock_timeout (&request-> reply_to, request-> timeout,
                               request_handle (request),
                               (dbyte) request-> cur_size,
                               request-> buffer, request-> tag);

            request = request-> prev;   /*  We want to continue in list      */
            request_destroy (request-> next);
          }
        else
        /*  If request socket is dead, reply SOCK_CLOSED and delete it       */
        if (!socket_is_alive (request_handle (request)))
          {
            send_sock_closed (&request-> reply_to, request-> timeout,
                              request_handle (request),
                              (dbyte) request-> cur_size,
                              request-> buffer, request-> tag);

            request = request-> prev;   /*  We want to continue in list      */
            request_destroy (request-> next);
          }
      }
    if (requests.next == &requests)
        raise_exception (no_requests_event);
}


/*  -------------------------------------------------------------------------
 *  request_handle
 *
 *  Returns the request handle (input or output, but not both).  Returns 0
 *  if neither is set; this cannot normally happen.
 */

static sock_t
request_handle (SOCKREQ *request)
{
    if (request-> input)
        return (request-> input);
    else
    if (request-> output)
        return (request-> output);
    else
        return (0);
}


/************************   WAIT FOR SOCKET ACTIVITY   ***********************/

MODULE wait_for_socket_activity (THREAD *thread)
{
    struct timeval
#if (defined (__WINDOWS__))             /*  Windows select() never unblocks  */
        timeout = { 0, 200000 };        /*  Timeout for select() = 1/5s      */
#else
        timeout = { 1, 0 };             /*  Timeout for select() = 1s        */
#endif
    sock_t
        top_socket = 0;                 /*  Highest socket number            */
    int
        rc;                             /*  Return code from select()        */

    smt_set_step ("prepare fd_sets");
    memset (&read_set,  0, sizeof (fd_set));
    memset (&write_set, 0, sizeof (fd_set));
    memset (&error_set, 0, sizeof (fd_set));
    for (request  = requests.next;
         request != (SOCKREQ *) &requests;
         request  = request-> next)
      {
        if (request-> input)
          {
            FD_SET ((int) request-> input, &read_set);
            FD_SET ((int) request-> input, &error_set);
            top_socket = max (request-> input, top_socket);
            if (trace_flag && had_activity)
                sendfmt (&operq, "INFO",
                         "smtsock: wait for input on %d", request-> input);
          }
        else
        if (request-> output)
          {
            FD_SET ((int) request-> output, &write_set);
            FD_SET ((int) request-> output, &error_set);
            top_socket = max (request-> output, top_socket);
            if (trace_flag && had_activity)
                sendfmt (&operq, "INFO",
                         "smtsock: wait for output on %d", request-> output);
          }
      }
    smt_set_step ("sock_select");
    rc = sock_select (
        (int) top_socket + 1,           /*  Handles to check                 */
        &read_set,                      /*  Check for input                  */
        &write_set,                     /*  Check for output                 */
        &error_set,                     /*  Check for errors                 */
        &timeout);                      /*  Timeout                          */

    smt_set_step ("after select");
    if (trace_flag && had_activity)
        sendfmt (&operq, "INFO",
                 "smtsock: return code from select() = %d", rc);

    /*  If select failed, send error to console, and terminate               */
    if (rc == SOCKET_ERROR)             /*  Error from socket call           */
      {
        if (sockerrno == EINTR)         /*  Ignore interrupted call          */
            raise_exception (no_activity_event);
        else
        if (sockerrno != EBADF)         /*  Ignore errors on handles         */
          {
            sendfmt (&operq, "ERROR",
                     "smtsock: error on select(): %s", sockmsg ());
            raise_exception (exception_event);
            if (trace_flag)
                sendfmt (&operq, "INFO",
                         "smtsock: error from select() = %d %s",
                          sockerrno, sockmsg ());
          }
      }
    else
    if (rc == 0)
        raise_exception (no_activity_event);

    had_activity = FALSE;
}


/**********************   CHECK FIRST SOCKET ACTIVITY   **********************/

MODULE check_first_socket_activity (THREAD *thread)
{
    request = (SOCKREQ *) requests.next;
    check_next_socket_activity (thread);
}


/***********************   CHECK NEXT SOCKET ACTIVITY   **********************/

MODULE check_next_socket_activity (THREAD *thread)
{
    /*  Check for next request with socket activity                          */
    the_next_event = finished_event;
    while (request != (SOCKREQ *) &requests)
      {
        if (FD_ISSET ((int) request-> input,  &read_set))
          {
            had_activity = TRUE;
            the_next_event = (request-> buffer? read_event: input_event);
            if (trace_flag)
                sendfmt (&operq, "INFO",
                         "smtsock: -- input ready on %d", request-> input);
          }
        else
        if (FD_ISSET ((int) request-> output, &write_set))
          {
            had_activity = TRUE;
            the_next_event = (request-> buffer? write_event: output_event);
            if (trace_flag)
                sendfmt (&operq, "INFO",
                         "smtsock: -- output ready on %d", request-> output);
          }
        /*  If any event got set, we can access it as active_request         */
        if (the_next_event != finished_event)
          {
            active_request = request;
            request = request-> next;
            break;
          }
        else
            request = request-> next;
      }
}


/*************************   READ DATA FROM SOCKET   *************************/

MODULE read_data_from_socket (THREAD *thread)
{
    /*  Read as much data as we can from the request's socket, then          */
    /*  update the request appropriately.                                    */
    if (trace_flag)
      {
        sendfmt (&operq, "INFO",
                 "smtsock: reading %d bytes from %ld",
                  active_request-> max_size - active_request-> cur_size,
                  active_request-> input);
      }
    handle_partial_io (
        active_request,
        read_TCP (active_request-> input,
                  active_request-> buffer   + active_request-> cur_size,
        (size_t) (active_request-> max_size - active_request-> cur_size))
    );
}


/*  -------------------------------------------------------------------------
 *  handle_partial_io
 *
 *  Handles the return code from a socket read or write, to update the
 *  request size indicators and set the next event.
 */

static void
handle_partial_io (SOCKREQ *request, int bytes_done)
{
    /*  If we read something, update the request cur_size, and check if      */
    /*  we got everything.  If so, we can signal 'finished'.  Else we loop.  */
    if (bytes_done > 0)
      {
        if (trace_flag)
          {
            byte *ptr = (byte *) request-> buffer + request-> cur_size;
            sendfmt (&operq, "INFO",
                     "smtsock: %02x %02x %02x %02x %02x %02x %02x %02x...",
                      ptr [0], ptr [1], ptr [2], ptr [3],
                      ptr [4], ptr [5], ptr [6], ptr [7]);
          }
        request-> cur_size += bytes_done;
        if (request-> cur_size >= request-> min_size)
            the_next_event = finished_event;
        else
            the_next_event = incomplete_event;
      }

    /*  If the return code was zero, the socket got closed.  Whatever we     */
    /*  got, we'll send back.  Some systems return EPIPE or ECONNRESET.      */
    else
    if (bytes_done == 0 || sockerrno == EPIPE || sockerrno == ECONNRESET)
        the_next_event = closed_event;
    else
    /*  In principle we can't get an EAGAIN, since we waited until the       */
    /*  socket was ready, but you never know.  We'll just try again...       */
    if (sockerrno == EAGAIN || sockerrno == EWOULDBLOCK)
        the_next_event = incomplete_event;

    /*  Anything else, that's an error                                       */
    else
        the_next_event = error_event;
}


/**************************   WRITE DATA TO SOCKET   *************************/

MODULE write_data_to_socket (THREAD *thread)
{
    /*  Write as much data as we can to the request's socket, then           */
    /*  update the request appropriately.                                    */
    if (trace_flag)
        sendfmt (&operq, "INFO",
                 "smtsock: writing %d bytes to %ld",
                  active_request-> max_size - active_request-> cur_size,
                  active_request-> output);

    handle_partial_io (
        active_request,
        write_TCP (active_request-> output,
                   active_request-> buffer   + active_request-> cur_size,
         (size_t) (active_request-> max_size - active_request-> cur_size))
    );
}


/*********************   SIGNAL SOCKET READY FOR INPUT   *********************/

MODULE signal_socket_ready_for_input (THREAD *thread)
{
    reply_normal (active_request, "SOCK_INPUT_OK");
    if (!active_request-> repeat)
        request_destroy (active_request);
}


/*  -------------------------------------------------------------------------
 *  reply_normal
 *
 *  Formats and sends a message containing a socket number only.  Used for
 *  all normal (non-read) replies - after a write or a connect.  Destroys
 *  the specified request.
 */

static void
reply_normal (SOCKREQ *request, char *event_name)
{
    msg_size = exdr_writed (&msg, SMT_SOCK_OK,
                            request_handle (request), request-> tag);
    event_send (
        &request-> reply_to,            /*  Send to specified queue          */
        NULL,                           /*  No queue for reply               */
        event_name,                     /*  Name of event to send            */
        msg_body,                       /*  Event body contents              */
        msg_size,                       /*  Event body size                  */
        NULL, NULL, NULL,               /*  No response events               */
        0);                             /*  No timeout                       */
}


/*********************   SIGNAL SOCKET READY FOR OUTPUT   ********************/

MODULE signal_socket_ready_for_output (THREAD *thread)
{
    reply_normal (active_request, "SOCK_OUTPUT_OK");
    request_destroy (active_request);
}


/**************************   SIGNAL READ COMPLETE   *************************/

MODULE signal_read_complete (THREAD *thread)
{
    if (active_request-> huge_block)
        send_sock_readh_ok (
            &active_request-> reply_to,
            active_request-> timeout,
            request_handle (active_request),
            active_request-> cur_size,
            active_request-> buffer,
            active_request-> tag);
    else
        send_sock_read_ok (
            &active_request-> reply_to,
            active_request-> timeout,
            request_handle (active_request),
            (dbyte) active_request-> cur_size,
            active_request-> buffer,
            active_request-> tag);

    if (active_request-> repeat)
        active_request-> cur_size = 0;
    else
        request_destroy (active_request);
}


/***************************   SIGNAL READ CLOSED   **************************/

MODULE signal_read_closed (THREAD *thread)
{
    /*  We send back a short data block, if any data was read                */
    send_sock_closed (
        &active_request-> reply_to,
        active_request-> timeout,
        request_handle (active_request),
        (dbyte) active_request-> cur_size,
        active_request-> buffer,
        active_request-> tag);

    request_destroy (active_request);
}


/*************************   SIGNAL WRITE COMPLETE   *************************/

MODULE signal_write_complete (THREAD *thread)
{
    if (active_request-> huge_block)
        send_sock_writeh_ok (
            &active_request-> reply_to,
            request_handle (active_request),
            active_request-> tag);
    else
        send_sock_write_ok (
            &active_request-> reply_to,
            request_handle (active_request),
            active_request-> tag);

    request_destroy (active_request);
}


/**************************   SIGNAL WRITE CLOSED   **************************/

MODULE signal_write_closed (THREAD *thread)
{
    reply_normal (active_request, "SOCK_CLOSED");
    request_destroy (active_request);
}


/**************************   SIGNAL SOCKET ERROR   **************************/

MODULE signal_socket_error (THREAD *thread)
{
    reply_error (&active_request-> reply_to, request_handle (active_request),
                 (char *) sockmsg (), active_request-> tag);
    request_destroy (active_request);
}


/***********************   SEND TIMEOUT EVENT TO SELF   **********************/

MODULE send_timeout_event_to_self (THREAD *thread)
{
    event_send (
        &thread-> queue-> qid,          /*  Send to specified queue          */
        NULL,                           /*  No queue for reply               */
        "_TIMEOUT",                     /*  Name of event to send            */
        NULL, 0,                        /*  No event body                    */
        NULL, NULL, NULL,               /*  No response events               */
        0);                             /*  No timeout                       */
}


/**************************   DESTROY ALL REQUESTS   *************************/

MODULE destroy_all_requests (THREAD *thread)
{
    while (requests.next != &requests)
        request_destroy (requests.next);
}


/*************************   TERMINATE THE THREAD   **************************/

MODULE terminate_the_thread (THREAD *thread)
{
    the_next_event = terminate_event;
}

Generated by Framer 1.0 © 1997 iMatix