/* * dnstap/dtstream.h - Frame Streams thread for unbound DNSTAP * * Copyright (c) 2020, NLnet Labs. All rights reserved. * * This software is open source. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * Neither the name of the NLNET LABS nor the names of its contributors may * be used to endorse or promote products derived from this software without * specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ /** * \file * * An implementation of the Frame Streams data transport protocol for * the Unbound DNSTAP message logging facility. */ #ifndef DTSTREAM_H #define DTSTREAM_H #include "util/locks.h" struct dt_msg_entry; struct dt_io_list_item; struct dt_io_thread; struct config_file; struct comm_base; /** * A message buffer with dnstap messages queued up. It is per-worker. * It has locks to synchronize. If the buffer is full, a new message * cannot be added and is discarded. A thread reads the messages and sends * them. */ struct dt_msg_queue { /** lock of the buffer structure. Hold this lock to add or remove * entries to the buffer. Release it so that other threads can also * put messages to log, or a message can be taken out to send away * by the writer thread. */ lock_basic_type lock; /** the maximum size of the buffer, in bytes */ size_t maxsize; /** current size of the buffer, in bytes. data bytes of messages. * If a new message make it more than maxsize, the buffer is full */ size_t cursize; /** number of messages in the queue */ int msgcount; /** list of messages. The messages are added to the back and taken * out from the front. */ struct dt_msg_entry* first, *last; /** reference to the io thread to wakeup */ struct dt_io_thread* dtio; /** the wakeup timer for dtio, on worker event base */ struct comm_timer* wakeup_timer; }; /** * An entry in the dt_msg_queue. contains one DNSTAP message. * It is malloced. */ struct dt_msg_entry { /** next in the list. */ struct dt_msg_entry* next; /** the buffer with the data to send, an encoded DNSTAP message */ void* buf; /** the length to send. */ size_t len; }; /** * Containing buffer and counter for reading DNSTAP frames. */ struct dt_frame_read_buf { /** Buffer containing frame, except length counter(s). */ void* buf; /** Number of bytes written to buffer. */ size_t buf_count; /** Capacity of the buffer. */ size_t buf_cap; /** Frame length field. Will contain the 2nd length field for control * frames. */ uint32_t frame_len; /** Number of bytes that have been written to the frame_length field. */ size_t frame_len_done; /** Set to 1 if this is a control frame, 0 otherwise (ie data frame). */ int control_frame; }; /** * IO thread that reads from the queues and writes them. */ struct dt_io_thread { /** the thread number for the dtio thread, * must be first to cast thread arg to int* in checklock code. */ int threadnum; /** event base, for event handling */ void* event_base; /** list of queues that is registered to get written */ struct dt_io_list_item* io_list; /** iterator point in the io_list, to pick from them in a * round-robin fashion, instead of only from the first when busy. * if NULL it means start at the start of the list. */ struct dt_io_list_item* io_list_iter; /** thread id, of the io thread */ ub_thread_type tid; /** if the io processing has started */ int started; /** ssl context for the io thread, for tls connections. type SSL_CTX* */ void* ssl_ctx; /** if SNI will be used for TLS connections. */ int tls_use_sni; /** file descriptor that the thread writes to */ int fd; /** event structure that the thread uses */ void* event; /** the event is added */ int event_added; /** event added is a write event */ int event_added_is_write; /** check for nonblocking connect errors on fd */ int check_nb_connect; /** ssl for current connection, type SSL* */ void* ssl; /** true if the handshake for SSL is done, 0 if not */ int ssl_handshake_done; /** true if briefly the SSL wants a read event, 0 if not. * This happens during negotiation, we then do not want to write, * but wait for a read event. */ int ssl_brief_read; /** true if SSL_read is waiting for a write event. Set back to 0 after * single write event is handled. */ int ssl_brief_write; /** the buffer that currently getting written, or NULL if no * (partial) message written now */ void* cur_msg; /** length of the current message */ size_t cur_msg_len; /** number of bytes written for the current message */ size_t cur_msg_done; /** number of bytes of the length that have been written, * for the current message length that precedes the frame */ size_t cur_msg_len_done; /** lock on wakeup_timer_enabled */ lock_basic_type wakeup_timer_lock; /** if wakeup timer is enabled in some thread */ int wakeup_timer_enabled; /** command pipe that stops the pipe if closed. Used to quit * the program. [0] is read, [1] is written to. */ int commandpipe[2]; /** the event to listen to the commandpipe */ void* command_event; /** the io thread wants to exit */ int want_to_exit; /** in stop flush, this is nonNULL and references the stop_ev */ void* stop_flush_event; /** the timer event for connection retries */ void* reconnect_timer; /** if the reconnect timer is added to the event base */ int reconnect_is_added; /** the current reconnection timeout, it is increased with * exponential backoff, in msec */ int reconnect_timeout; /** If the log server is connected to over unix domain sockets, * eg. a file is named that is created to log onto. */ int upstream_is_unix; /** if the log server is connected to over TCP. The ip address and * port are used */ int upstream_is_tcp; /** if the log server is connected to over TLS. ip address, port, * and client certificates can be used for authentication. */ int upstream_is_tls; /** Perform bidirectional Frame Streams handshake before sending * messages. */ int is_bidirectional; /** Set if the READY control frame has been sent. */ int ready_frame_sent; /** Set if valid ACCEPT frame is received. */ int accept_frame_received; /** (partially) read frame */ struct dt_frame_read_buf read_frame; /** the file path for unix socket (or NULL) */ char* socket_path; /** the ip address and port number (or NULL) */ char* ip_str; /** is the TLS upstream authenticated by name, if nonNULL, * we use the same cert bundle as used by other TLS streams. */ char* tls_server_name; /** are client certificates in use */ int use_client_certs; /** client cert files: the .key file */ char* client_key_file; /** client cert files: the .pem file */ char* client_cert_file; }; /** * IO thread list of queues list item * lists a worker queue that should be looked at and sent to the log server. */ struct dt_io_list_item { /** next in the list of buffers to inspect */ struct dt_io_list_item* next; /** buffer of this worker */ struct dt_msg_queue* queue; }; /** * Create new (empty) worker message queue. Limit set to default on max. * @param base: event base for wakeup timer. * @return NULL on malloc failure or a new queue (not locked). */ struct dt_msg_queue* dt_msg_queue_create(struct comm_base* base); /** * Delete a worker message queue. It has to be unlinked from access, * so it can be deleted without lock worries. The queue is emptied (deleted). * @param mq: message queue. */ void dt_msg_queue_delete(struct dt_msg_queue* mq); /** * Submit a message to the queue. The queue is locked by the routine, * the message is inserted, and then the queue is unlocked so the * message can be picked up by the writer thread. * @param mq: message queue. * @param buf: buffer with message (dnstap contents). * The buffer must have been malloced by caller. It is linked in * the queue, and is free()d after use. If the routine fails * the buffer is freed as well (and nothing happens, the item * could not be logged). * @param len: length of buffer. */ void dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len); /** timer callback to wakeup dtio thread to process messages */ void mq_wakeup_cb(void* arg); /** * Create IO thread. * @return new io thread object. not yet started. or NULL malloc failure. */ struct dt_io_thread* dt_io_thread_create(void); /** * Delete the IO thread structure. * @param dtio: the io thread that is deleted. It must not be running. */ void dt_io_thread_delete(struct dt_io_thread* dtio); /** * Apply config to the dtio thread * @param dtio: io thread, not yet started. * @param cfg: config file struct. * @return false on malloc failure. */ int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg); /** * Register a msg queue to the io thread. It will be polled to see if * there are messages and those then get removed and sent, when the thread * is running. * @param dtio: the io thread. * @param mq: message queue to register. * @return false on failure (malloc failure). */ int dt_io_thread_register_queue(struct dt_io_thread* dtio, struct dt_msg_queue* mq); /** * Unregister queue from io thread. * @param dtio: the io thread. * @param mq: message queue. */ void dt_io_thread_unregister_queue(struct dt_io_thread* dtio, struct dt_msg_queue* mq); /** * Start the io thread * @param dtio: the io thread. * @param event_base_nothr: the event base to attach the events to, in case * we are running without threads. With threads, this is ignored * and a thread is started to process the dnstap log messages. * @param numworkers: number of worker threads. The dnstap io thread is * that number +1 as the threadnumber (in logs). * @return false on failure. */ int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr, int numworkers); /** * Stop the io thread * @param dtio: the io thread. */ void dt_io_thread_stop(struct dt_io_thread* dtio); /** callback for the dnstap reconnect, to start reconnecting to output */ void dtio_reconnect_timeout_cb(int fd, short bits, void* arg); /** callback for the dnstap events, to write to the output */ void dtio_output_cb(int fd, short bits, void* arg); /** callback for the dnstap commandpipe, to stop the dnstap IO */ void dtio_cmd_cb(int fd, short bits, void* arg); /** callback for the timer when the thread stops and wants to finish up */ void dtio_stop_timer_cb(int fd, short bits, void* arg); /** callback for the output when the thread stops and wants to finish up */ void dtio_stop_ev_cb(int fd, short bits, void* arg); /** callback for unbound-dnstap-socket */ void dtio_tap_callback(int fd, short bits, void* arg); /** callback for unbound-dnstap-socket */ void dtio_mainfdcallback(int fd, short bits, void* arg); #endif /* DTSTREAM_H */