aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNathanael Sensfelder <SpamShield0@MultiAgentSystems.org>2017-06-09 15:18:04 +0200
committerNathanael Sensfelder <SpamShield0@MultiAgentSystems.org>2017-06-09 15:18:04 +0200
commit980d0b8c4c16b2583e2da052ed964a7170485ce2 (patch)
tree1155e51fc4798e22c4cc2eafb05cbf725817a353
downloadlimiter-980d0b8c4c16b2583e2da052ed964a7170485ce2.zip
limiter-980d0b8c4c16b2583e2da052ed964a7170485ce2.tar.bz2
Initial commit.
-rw-r--r--CMakeLists.txt29
-rw-r--r--LICENSE27
-rw-r--r--README.md24
-rw-r--r--src/CMakeLists.txt12
-rw-r--r--src/core/CMakeLists.txt6
-rw-r--r--src/core/index_types.h27
-rw-r--r--src/error/CMakeLists.txt6
-rw-r--r--src/error/error.h143
-rw-r--r--src/filter/CMakeLists.txt6
-rw-r--r--src/filter/filter.c465
-rw-r--r--src/filter/filter.h25
-rw-r--r--src/filter/filter_types.h22
-rw-r--r--src/main.c54
-rw-r--r--src/parameters/CMakeLists.txt7
-rw-r--r--src/parameters/parameters.c145
-rw-r--r--src/parameters/parameters.h28
-rw-r--r--src/parameters/parameters_getters.c25
-rw-r--r--src/parameters/parameters_types.h15
-rw-r--r--src/pervasive.h28
-rw-r--r--src/server/CMakeLists.txt15
-rw-r--r--src/server/server.c101
-rw-r--r--src/server/server.h83
-rw-r--r--src/server/server_create_socket.c195
-rw-r--r--src/server/server_finalize.c42
-rw-r--r--src/server/server_initialize.c99
-rw-r--r--src/server/server_joining_threads.c38
-rw-r--r--src/server/server_new_connection.c180
-rw-r--r--src/server/server_signal.c41
-rw-r--r--src/server/server_types.h75
-rw-r--r--src/server/server_wait_for_event.c62
-rw-r--r--src/server/server_worker.c73
31 files changed, 2098 insertions, 0 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
new file mode 100644
index 0000000..7825260
--- /dev/null
+++ b/CMakeLists.txt
@@ -0,0 +1,29 @@
+cmake_minimum_required(VERSION 3.0 FATAL_ERROR)
+
+project("JabberHive - Limiter")
+
+include(FindPkgConfig)
+
+add_subdirectory(src)
+add_definitions(-D_POSIX_SOURCE)
+add_definitions(-D_POSIX_C_SOURCE=200809L)
+
+set(CMAKE_C_FLAGS $ENV{CFLAGS})
+if(CMAKE_COMPILER_IS_GNUCC)
+ set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -g -Wall -Wpedantic -Wconversion")
+ message(STATUS "GNUCC detected. Adding '-O3' parameter.")
+ set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -O3")
+endif()
+message(STATUS "CFLAGS=${CMAKE_C_FLAGS}")
+
+# ${SRC_FILES} is recursively defined in the subdirectories.
+# Each subdirectory only adds the source files that are present at its level.
+add_executable(jabberhive-limiter ${SRC_FILES})
+set_property(TARGET jabberhive-limiter PROPERTY C_STANDARD 99)
+set_property(TARGET jabberhive-limiter PROPERTY C_STANDARD_REQUIRED ON)
+
+find_package(Threads)
+target_link_libraries(jabberhive-limiter ${CMAKE_THREAD_LIBS_INIT})
+
+## OPTION HANDLING #############################################################
+# TODO
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..f546fa2
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,27 @@
+Copyright (c) 2017, Nathanaƫl Sensfelder
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+* Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+
+* Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+* Neither the name of JabberHive nor the names of its
+ contributors may be used to endorse or promote products derived from
+ this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..b4d5d39
--- /dev/null
+++ b/README.md
@@ -0,0 +1,24 @@
+## What is JabberHive?
+JabberHive is a modular ChatBot system. All "modules" are in fact separate
+programs linked together using the JabberHive Protocol. Please refer to the
+protocol for more information.
+
+## Component Description
+* Filter for a JabberHive network.
+* Randomly transforms ?RLR queries into ?RL ones.
+* The transformation chance is a run-time parameter.
+
+## JabberHive Protocol Compatibility
+* **Protocol Version(s):** 1.
+* **Inbound Connections:** Multiple.
+* **Outbound Connections:** Multiple.
+* **Pipelining:** No.
+* **Behavior:** Gateway.
+
+## Notes
+* Does not correctly reply to Pipelining & Protocol Version requests.
+
+## Dependencies
+- POSIX compliant OS.
+- C compiler (with C99 support).
+- CMake.
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
new file mode 100644
index 0000000..0b242bc
--- /dev/null
+++ b/src/CMakeLists.txt
@@ -0,0 +1,12 @@
+add_subdirectory(error)
+add_subdirectory(core)
+add_subdirectory(filter)
+add_subdirectory(server)
+add_subdirectory(parameters)
+
+set(
+ SRC_FILES ${SRC_FILES}
+ ${CMAKE_CURRENT_SOURCE_DIR}/main.c
+)
+
+set(SRC_FILES ${SRC_FILES} PARENT_SCOPE)
diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt
new file mode 100644
index 0000000..fa07534
--- /dev/null
+++ b/src/core/CMakeLists.txt
@@ -0,0 +1,6 @@
+set(
+ SRC_FILES ${SRC_FILES}
+)
+
+set(SRC_FILES ${SRC_FILES} PARENT_SCOPE)
+
diff --git a/src/core/index_types.h b/src/core/index_types.h
new file mode 100644
index 0000000..9131569
--- /dev/null
+++ b/src/core/index_types.h
@@ -0,0 +1,27 @@
+#ifndef _JH_CORE_INDEX_TYPES_H_
+#define _JH_CORE_INDEX_TYPES_H_
+
+#include "../pervasive.h"
+
+/*
+ * JH_index is a replacement for size_t. As many indices are stored for every
+ * word learned, having control over which type of variable is used to represent
+ * those indices lets us scale the RAM usage.
+ */
+
+#include <limits.h>
+#include <stdint.h>
+
+/* Must be unsigned. */
+typedef unsigned int JH_index;
+
+/* Must be > 0. */
+#define JH_INDEX_MAX UINT_MAX
+
+#ifndef JH_RUNNING_FRAMA_C
+ #if (JH_INDEX_MAX > SIZE_MAX)
+ #error "JH_index should not be able to go higher than a size_t variable."
+ #endif
+#endif
+
+#endif
diff --git a/src/error/CMakeLists.txt b/src/error/CMakeLists.txt
new file mode 100644
index 0000000..fa07534
--- /dev/null
+++ b/src/error/CMakeLists.txt
@@ -0,0 +1,6 @@
+set(
+ SRC_FILES ${SRC_FILES}
+)
+
+set(SRC_FILES ${SRC_FILES} PARENT_SCOPE)
+
diff --git a/src/error/error.h b/src/error/error.h
new file mode 100644
index 0000000..145c838
--- /dev/null
+++ b/src/error/error.h
@@ -0,0 +1,143 @@
+#ifndef _JH_ERROR_ERROR_H_
+#define _JH_ERROR_ERROR_H_
+
+#include <stdio.h>
+
+#include "../pervasive.h"
+
+#ifndef JH_DEBUG_PROGRAM_FLOW
+ #define JH_DEBUG_PROGRAM_FLOW (0 || JH_DEBUG_ALL)
+#endif
+
+#ifndef JH_DEBUG_CONFIG
+ #define JH_DEBUG_CONFIG (0 || JH_DEBUG_ALL)
+#endif
+
+#ifndef JH_DEBUG_LEARNING
+ #define JH_DEBUG_LEARNING (0 || JH_DEBUG_ALL)
+#endif
+
+#ifndef JH_DEBUG_NETWORK
+ #define JH_DEBUG_NETWORK 1
+#endif
+
+#ifndef JH_DEBUG_NETWORK
+ #define JH_DEBUG_NETWORK (0 || JH_DEBUG_ALL)
+#endif
+
+#define JH_ENABLE_WARNINGS_OUTPUT 1
+#define JH_ENABLE_RUNTIME_ERRORS_OUTPUT 1
+#define JH_ENABLE_PROGRAMMING_ERRORS_OUTPUT 1
+#define JH_ENABLE_FATAL_ERROR_OUTPUT 1
+
+#ifdef JH_ENABLE_ERROR_LOCATION
+ #define JH_LOCATION " [" __FILE__ "][" JH_TO_STRING(__LINE__) "]"
+#else
+ #define JH_LOCATION ""
+#endif
+
+#define JH_PRINT_STDERR(io, symbol, str, ...)\
+ fprintf(io, "[" symbol "]" JH_LOCATION " " str "\n", __VA_ARGS__);
+
+/*
+ * Given that we use preprocessor contants as flags, we can expect the compilers
+ * to remove the test condition for disabled flags. No need to be shy about
+ * allowing many debug options.
+ */
+
+#define JH_DEBUG(io, flag, str, ...)\
+ JH_ISOLATE\
+ (\
+ if (flag)\
+ {\
+ JH_PRINT_STDERR(io, "D", str, __VA_ARGS__);\
+ }\
+ )
+
+
+#define JH_WARNING(io, str, ...)\
+ JH_ISOLATE\
+ (\
+ if (JH_ENABLE_WARNINGS_OUTPUT)\
+ {\
+ JH_PRINT_STDERR(io, "W", str, __VA_ARGS__);\
+ }\
+ )
+
+#define JH_ERROR(io, str, ...)\
+ JH_ISOLATE\
+ (\
+ if (JH_ENABLE_RUNTIME_ERRORS_OUTPUT)\
+ {\
+ JH_PRINT_STDERR(io, "E", str, __VA_ARGS__);\
+ }\
+ )
+
+#define JH_PROG_ERROR(io, str, ...)\
+ JH_ISOLATE\
+ (\
+ if (JH_ENABLE_PROGRAMMING_ERRORS_OUTPUT)\
+ {\
+ JH_PRINT_STDERR(io, "P", str, __VA_ARGS__);\
+ }\
+ )
+
+#define JH_FATAL(io, str, ...)\
+ JH_ISOLATE\
+ (\
+ if (JH_ENABLE_FATAL_ERROR_OUTPUT)\
+ {\
+ JH_PRINT_STDERR(io, "F", str, __VA_ARGS__);\
+ }\
+ )
+
+/* For outputs without dynamic content (static). ******************************/
+
+#define JH_PRINT_S_STDERR(io, symbol, str)\
+ fprintf(io, "[" symbol "]" JH_LOCATION " " str "\n");
+
+#define JH_S_DEBUG(io, flag, str)\
+ JH_ISOLATE\
+ (\
+ if (flag)\
+ {\
+ JH_PRINT_S_STDERR(io, "D", str);\
+ }\
+ )
+
+#define JH_S_WARNING(io, str)\
+ JH_ISOLATE\
+ (\
+ if (JH_ENABLE_WARNINGS_OUTPUT)\
+ {\
+ JH_PRINT_S_STDERR(io, "W", str);\
+ }\
+ )
+
+#define JH_S_ERROR(io, str)\
+ JH_ISOLATE\
+ (\
+ if (JH_ENABLE_RUNTIME_ERRORS_OUTPUT)\
+ {\
+ JH_PRINT_S_STDERR(io, "E", str);\
+ }\
+ )
+
+#define JH_S_PROG_ERROR(io, str)\
+ JH_ISOLATE\
+ (\
+ if (JH_ENABLE_PROGRAMMING_ERRORS_OUTPUT)\
+ {\
+ JH_PRINT_S_STDERR(io, "P", str);\
+ }\
+ )
+
+#define JH_S_FATAL(io, str)\
+ JH_ISOLATE\
+ (\
+ if (JH_ENABLE_FATAL_ERROR_OUTPUT)\
+ {\
+ JH_PRINT_S_STDERR(io, "F", str);\
+ }\
+ )
+#endif
diff --git a/src/filter/CMakeLists.txt b/src/filter/CMakeLists.txt
new file mode 100644
index 0000000..c8add44
--- /dev/null
+++ b/src/filter/CMakeLists.txt
@@ -0,0 +1,6 @@
+set(
+ SRC_FILES ${SRC_FILES}
+ ${CMAKE_CURRENT_SOURCE_DIR}/filter.c
+)
+
+set(SRC_FILES ${SRC_FILES} PARENT_SCOPE)
diff --git a/src/filter/filter.c b/src/filter/filter.c
new file mode 100644
index 0000000..20adb79
--- /dev/null
+++ b/src/filter/filter.c
@@ -0,0 +1,465 @@
+#include <sys/socket.h>
+#include <sys/un.h>
+
+#include <stdlib.h>
+#include <errno.h>
+#include <string.h>
+#include <unistd.h>
+
+#include "../pervasive.h"
+#include "../error/error.h"
+#include "../parameters/parameters.h"
+
+#include "filter.h"
+
+static const char REQUEST_TEMPLATE[] = {'?', 'R', 'L', 'R', ' '};
+
+static int connect_downstream
+(
+ struct JH_limiter_filter filter [const restrict static 1],
+ const struct JH_parameters params [const restrict static 1]
+)
+{
+ struct sockaddr_un addr;
+
+ const int old_errno = errno;
+
+ errno = 0;
+
+ if ((filter->downstream_socket = socket(AF_UNIX, SOCK_STREAM, 0)) == -1)
+ {
+ JH_FATAL
+ (
+ stderr,
+ "Unable to create socket: %s.",
+ strerror(errno)
+ );
+
+ errno = old_errno;
+
+ return -1;
+ }
+
+ errno = old_errno;
+
+ memset((void *) &addr, (int) 0, sizeof(addr));
+
+ addr.sun_family = AF_UNIX;
+
+ strncpy
+ (
+ (char *) addr.sun_path,
+ JH_parameters_get_dest_socket_name(params),
+ (sizeof(addr.sun_path) - ((size_t) 1))
+ );
+
+ errno = 0;
+
+ if
+ (
+ connect
+ (
+ filter->downstream_socket,
+ (struct sockaddr *) &addr,
+ sizeof(addr)
+ )
+ == -1
+ )
+ {
+ JH_FATAL
+ (
+ stderr,
+ "Unable to connect to address: %s.",
+ strerror(errno)
+ );
+
+ errno = old_errno;
+
+ close(filter->downstream_socket);
+
+ return -1;
+ }
+
+ errno = old_errno;
+
+ filter->state = JH_LIMITER_IS_LISTENING_UPSTREAM;
+
+ return 0;
+}
+
+static int listen_to_upstream
+(
+ struct JH_limiter_filter filter [const restrict static 1],
+ const struct JH_parameters params [const restrict static 1]
+)
+{
+ ssize_t io_bytes;
+ const int old_errno = errno;
+
+ for (;;)
+ {
+ errno = 0;
+ io_bytes =
+ read
+ (
+ filter->upstream_socket,
+ (void *) (filter->buffer + filter->buffer_index),
+ sizeof(char)
+ );
+
+ if (io_bytes == -1)
+ {
+ JH_ERROR
+ (
+ stderr,
+ "Upstream read error %d (\"%s\").",
+ errno,
+ strerror(errno)
+ );
+
+ errno = old_errno;
+
+ return -1;
+ }
+
+ errno = old_errno;
+
+ if
+ (
+ filter->buffer[filter->buffer_index]
+ != REQUEST_TEMPLATE[filter->buffer_index]
+ )
+ {
+ filter->buffer_index += 1;
+ filter->state = JH_LIMITER_IS_SENDING_DOWNSTREAM;
+
+ return 0;
+ }
+
+ filter->buffer_index += 1;
+
+ if (filter->buffer_index == 5)
+ {
+ if (rand() >= JH_parameters_get_reply_rate(params))
+ {
+ strncpy
+ (
+ filter->buffer,
+ "?RL ",
+ (((size_t) 4) * sizeof(char))
+ );
+
+ filter->buffer_index = 4;
+ }
+
+ filter->state = JH_LIMITER_IS_SENDING_DOWNSTREAM;
+
+ return 0;
+ }
+ }
+
+ return -1;
+}
+
+static int send_downstream
+(
+ struct JH_limiter_filter filter [const restrict static 1]
+)
+{
+ ssize_t io_bytes;
+ const int old_errno = errno;
+
+ if (filter->buffer_index > 0)
+ {
+ errno = 0;
+
+ io_bytes =
+ write
+ (
+ filter->downstream_socket,
+ (void *) filter->buffer,
+ (((size_t) filter->buffer_index) * sizeof(char))
+ );
+
+ filter->buffer_index = 0;
+
+ if (io_bytes == -1)
+ {
+ JH_ERROR
+ (
+ stderr,
+ "Downstream write error %d (\"%s\").",
+ errno,
+ strerror(errno)
+ );
+
+ errno = old_errno;
+
+ return -1;
+ }
+
+ errno = old_errno;
+ }
+
+ for (;;)
+ {
+ errno = 0;
+ io_bytes =
+ read
+ (
+ filter->upstream_socket,
+ (void *) filter->buffer,
+ sizeof(char)
+ );
+
+ if (io_bytes == -1)
+ {
+ JH_ERROR
+ (
+ stderr,
+ "Upstream read error %d (\"%s\").",
+ errno,
+ strerror(errno)
+ );
+
+ errno = old_errno;
+
+ return -1;
+ }
+
+ errno = 0;
+
+ io_bytes = write
+ (
+ filter->downstream_socket,
+ (void *) filter->buffer,
+ sizeof(char)
+ );
+
+ if (io_bytes == -1)
+ {
+ JH_ERROR
+ (
+ stderr,
+ "Upstream write error %d (\"%s\").",
+ errno,
+ strerror(errno)
+ );
+
+ errno = old_errno;
+
+ return -1;
+ }
+
+ errno = old_errno;
+
+ if (filter->buffer[0] == '\n')
+ {
+ filter->state = JH_LIMITER_IS_SENDING_UPSTREAM;
+
+ return 0;
+ }
+ }
+}
+
+static int send_upstream
+(
+ struct JH_limiter_filter filter [const restrict static 1]
+)
+{
+ ssize_t io_bytes;
+ const int old_errno = errno;
+
+ for (;;)
+ {
+ errno = 0;
+
+ io_bytes =
+ read
+ (
+ filter->downstream_socket,
+ (void *) filter->buffer,
+ sizeof(char)
+ );
+
+ if (io_bytes == -1)
+ {
+ JH_ERROR
+ (
+ stderr,
+ "Downstream read error %d (\"%s\").",
+ errno,
+ strerror(errno)
+ );
+
+ errno = old_errno;
+
+ return -1;
+ }
+
+ errno = 0;
+
+ io_bytes = write
+ (
+ filter->upstream_socket,
+ (void *) filter->buffer,
+ sizeof(char)
+ );
+
+ if (io_bytes == -1)
+ {
+ JH_ERROR
+ (
+ stderr,
+ "Upstream write error %d (\"%s\").",
+ errno,
+ strerror(errno)
+ );
+
+ errno = old_errno;
+
+ return -1;
+ }
+
+ errno = old_errno;
+
+ switch (filter->buffer_index)
+ {
+ case -1:
+ if (filter->buffer[0] == '\n')
+ {
+ filter->buffer_index = 0;
+ }
+ break;
+
+ case 0:
+ if (filter->buffer[0] == '!')
+ {
+ filter->buffer_index = 1;
+ }
+ else
+ {
+ filter->buffer_index = -1;
+ }
+ break;
+
+ case 1:
+ if ((filter->buffer[0] == 'N') || (filter->buffer[0] == 'P'))
+ {
+ filter->buffer_index = 2;
+ }
+ else
+ {
+ filter->buffer_index = -1;
+ }
+ break;
+
+ case 2:
+ if (filter->buffer[0] == ' ')
+ {
+ filter->buffer_index = 3;
+ }
+ else
+ {
+ filter->buffer_index = -1;
+ }
+ break;
+
+ case 3:
+ if (filter->buffer[0] == '\n')
+ {
+ filter->buffer_index = 0;
+ filter->state = JH_LIMITER_IS_LISTENING_UPSTREAM;
+
+ return 0;
+ }
+ else
+ {
+ filter->buffer_index = -1;
+ }
+ break;
+
+ default:
+ JH_PROG_ERROR
+ (
+ stderr,
+ "Invalid value for 'filter->buffer_index': %d.",
+ filter->buffer_index
+ );
+
+ filter->buffer_index = 0;
+
+ return -1;
+ }
+ }
+
+ return -1;
+}
+
+/******************************************************************************/
+/** EXPORTED ******************************************************************/
+/******************************************************************************/
+
+int JH_limiter_filter_step
+(
+ struct JH_limiter_filter filter [const restrict static 1],
+ const struct JH_parameters params [const restrict static 1]
+)
+{
+ switch (filter->state)
+ {
+ case JH_LIMITER_IS_CONNECTING:
+ JH_DEBUG(stderr, 1, "<CONNECTING> (index: %d)", filter->buffer_index);
+ return connect_downstream(filter, params);
+
+ case JH_LIMITER_IS_LISTENING_UPSTREAM:
+ JH_DEBUG(stderr, 1, "<LISTENING_UP> (index: %d)", filter->buffer_index);
+ return listen_to_upstream(filter, params);
+
+ case JH_LIMITER_IS_SENDING_DOWNSTREAM:
+ JH_DEBUG(stderr, 1, "<SENDING_DOWN> (index: %d)", filter->buffer_index);
+ return send_downstream(filter);
+
+ case JH_LIMITER_IS_SENDING_UPSTREAM:
+ JH_DEBUG(stderr, 1, "<SENDING_UP> (index: %d)", filter->buffer_index);
+ return send_upstream(filter);
+
+ default:
+ return -1;
+ }
+}
+
+int JH_limiter_filter_initialize
+(
+ struct JH_limiter_filter filter [const restrict static 1],
+ const int upstream_socket
+)
+{
+ filter->state = JH_LIMITER_IS_CONNECTING;
+ filter->buffer_index = 0;
+ filter->upstream_socket = upstream_socket;
+ filter->downstream_socket = -1;
+
+ return 0;
+}
+
+void JH_limiter_filter_finalize
+(
+ struct JH_limiter_filter filter [const restrict static 1]
+)
+{
+
+ if (filter->upstream_socket != -1)
+ {
+ close(filter->upstream_socket);
+
+ filter->upstream_socket = -1;
+ }
+
+ if (filter->downstream_socket != -1)
+ {
+ close(filter->downstream_socket);
+
+ filter->downstream_socket = -1;
+ }
+}
diff --git a/src/filter/filter.h b/src/filter/filter.h
new file mode 100644
index 0000000..5c21791
--- /dev/null
+++ b/src/filter/filter.h
@@ -0,0 +1,25 @@
+#ifndef _JH_LIMITER_FILTER_H_
+#define _JH_LIMITER_FILTER_H_
+
+#include "../parameters/parameters_types.h"
+
+#include "filter_types.h"
+
+int JH_limiter_filter_initialize
+(
+ struct JH_limiter_filter filter [const restrict static 1],
+ const int upstream_socket
+);
+
+int JH_limiter_filter_step
+(
+ struct JH_limiter_filter filter [const restrict static 1],
+ const struct JH_parameters params [const restrict static 1]
+);
+
+void JH_limiter_filter_finalize
+(
+ struct JH_limiter_filter filter [const restrict static 1]
+);
+
+#endif
diff --git a/src/filter/filter_types.h b/src/filter/filter_types.h
new file mode 100644
index 0000000..b2c5ecb
--- /dev/null
+++ b/src/filter/filter_types.h
@@ -0,0 +1,22 @@
+#ifndef _JH_LIMITER_FILTER_TYPES_H_
+#define _JH_LIMITER_FILTER_TYPES_H_
+
+#define JH_LIMITER_FILTER_BUFFER_SIZE 5
+enum JH_limiter_filter_state
+{
+ JH_LIMITER_IS_CONNECTING,
+ JH_LIMITER_IS_LISTENING_UPSTREAM,
+ JH_LIMITER_IS_SENDING_DOWNSTREAM,
+ JH_LIMITER_IS_SENDING_UPSTREAM
+};
+
+struct JH_limiter_filter
+{
+ enum JH_limiter_filter_state state;
+ char buffer[JH_LIMITER_FILTER_BUFFER_SIZE];
+ int buffer_index;
+ int upstream_socket;
+ int downstream_socket;
+};
+
+#endif
diff --git a/src/main.c b/src/main.c
new file mode 100644
index 0000000..9216104
--- /dev/null
+++ b/src/main.c
@@ -0,0 +1,54 @@
+#include <sys/socket.h>
+#include <sys/un.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <unistd.h>
+
+#include "error/error.h"
+#include "parameters/parameters.h"
+#include "server/server.h"
+
+#include "pervasive.h"
+
+static void print_help (const char runnable [const restrict static 1])
+{
+ printf
+ (
+ "JabberHive - Limiter\n"
+ "Software Version %d\n"
+ "Protocol Version %d\n"
+ "\nUsages:\n"
+ " JH GATEWAY:\t%s SOCKET_NAME DESTINATION REPLY_RATE\n"
+ " SHOW HELP:\tAnything else.\n"
+ "\nParameters:\n"
+ " SOCKET_NAME:\tValid UNIX socket.\n"
+ " DESTINATION:\tValid UNIX socket.\n"
+ " REPLY_RATE:\tInteger [0,100].\n",
+ JH_PROGRAM_VERSION,
+ JH_PROTOCOL_VERSION,
+ runnable
+ );
+}
+
+
+int main (int const argc, const char * argv [const static argc])
+{
+ struct JH_parameters params;
+
+ if (JH_parameters_initialize(&params, argc, argv) < 0)
+ {
+ print_help(argv[0]);
+
+ return -1;
+ }
+
+ if (JH_server_main(&params) < 0)
+ {
+ return -1;
+ }
+
+ return 0;
+}
diff --git a/src/parameters/CMakeLists.txt b/src/parameters/CMakeLists.txt
new file mode 100644
index 0000000..2aa7ece
--- /dev/null
+++ b/src/parameters/CMakeLists.txt
@@ -0,0 +1,7 @@
+set(
+ SRC_FILES ${SRC_FILES}
+ ${CMAKE_CURRENT_SOURCE_DIR}/parameters.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/parameters_getters.c
+)
+set(SRC_FILES ${SRC_FILES} PARENT_SCOPE)
+
diff --git a/src/parameters/parameters.c b/src/parameters/parameters.c
new file mode 100644
index 0000000..b856c3e
--- /dev/null
+++ b/src/parameters/parameters.c
@@ -0,0 +1,145 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <limits.h>
+
+#include "../error/error.h"
+
+#include "parameters.h"
+
+static int parse_reply_rate
+(
+ struct JH_parameters param [const restrict static 1],
+ const char argv [const restrict]
+)
+{
+ long long int input;
+ const int old_errno = errno;
+
+ errno = 0;
+
+ input = strtoll(argv, (char **) NULL, 10);
+
+ if
+ (
+ (errno != 0)
+ || (input > (long long int) 100)
+ || (input < 0)
+ )
+ {
+ JH_FATAL
+ (
+ stderr,
+ "Invalid or value for parameter 'reply_rate', accepted "
+ "range is "
+ "[0, %hu] (integer).",
+ 100
+ );
+
+ errno = old_errno;
+
+ return -1;
+ }
+
+ param->reply_rate =
+ (int)
+ (
+ ((double) RAND_MAX)
+ * (((double) input) * ((double) 0.01))
+ );
+
+ errno = old_errno;
+
+ return 0;
+}
+
+static void set_default_to_all_fields
+(
+ struct JH_parameters param [const restrict static 1]
+)
+{
+ param->reply_rate = -1;
+ param->socket_name = (const char *) NULL;
+ param->dest_socket_name = (const char *) NULL;
+}
+
+static int is_valid
+(
+ struct JH_parameters param [const restrict static 1]
+)
+{
+ int valid;
+
+ valid = 1;
+
+ if (param->socket_name == (const char *) NULL)
+ {
+ JH_S_FATAL(stderr, "Missing parameter: This entity's socket name.");
+
+ valid = 0;
+ }
+
+ if (param->dest_socket_name == (const char *) NULL)
+ {
+ JH_S_FATAL(stderr, "Missing parameter: The destination's socket name.");
+
+ valid = 0;
+ }
+
+ if (param->reply_rate == -1)
+ {
+ JH_S_FATAL(stderr, "Missing parameter: The reply rate.");
+
+ valid = 0;
+ }
+
+ return valid;
+}
+
+static void set_parameters
+(
+ struct JH_parameters param [const restrict static 1],
+ int const argc,
+ const char * argv [const static argc]
+)
+{
+ if (argc < 2)
+ {
+ return;
+ }
+
+ param->socket_name = argv[1];
+
+ if (argc < 3)
+ {
+ return;
+ }
+
+ param->dest_socket_name = argv[2];
+
+ if (argc < 4)
+ {
+ return;
+ }
+
+ parse_reply_rate(param, argv[3]);
+}
+
+int JH_parameters_initialize
+(
+ struct JH_parameters param [const restrict static 1],
+ int const argc,
+ const char * argv [const static argc]
+)
+{
+ set_default_to_all_fields(param);
+
+ set_parameters(param, argc, argv);
+
+ if (!is_valid(param))
+ {
+ return -1;
+ }
+
+ return 0;
+}
diff --git a/src/parameters/parameters.h b/src/parameters/parameters.h
new file mode 100644
index 0000000..107350e
--- /dev/null
+++ b/src/parameters/parameters.h
@@ -0,0 +1,28 @@
+#ifndef _JH_CLI_PARAMETERS_H_
+#define _JH_CLI_PARAMETERS_H_
+
+#include "parameters_types.h"
+
+int JH_parameters_initialize
+(
+ struct JH_parameters param [const restrict static 1],
+ int const argc,
+ const char * argv [const static argc]
+);
+
+const int JH_parameters_get_reply_rate
+(
+ const struct JH_parameters param [const restrict static 1]
+);
+
+const char * JH_parameters_get_socket_name
+(
+ const struct JH_parameters param [const restrict static 1]
+);
+
+const char * JH_parameters_get_dest_socket_name
+(
+ const struct JH_parameters param [const restrict static 1]
+);
+
+#endif
diff --git a/src/parameters/parameters_getters.c b/src/parameters/parameters_getters.c
new file mode 100644
index 0000000..0b3ec9e
--- /dev/null
+++ b/src/parameters/parameters_getters.c
@@ -0,0 +1,25 @@
+#include "parameters.h"
+
+const int JH_parameters_get_reply_rate
+(
+ const struct JH_parameters param [const restrict static 1]
+)
+{
+ return param->reply_rate;
+}
+
+const char * JH_parameters_get_socket_name
+(
+ const struct JH_parameters param [const restrict static 1]
+)
+{
+ return param->socket_name;
+}
+
+const char * JH_parameters_get_dest_socket_name
+(
+ const struct JH_parameters param [const restrict static 1]
+)
+{
+ return param->dest_socket_name;
+}
diff --git a/src/parameters/parameters_types.h b/src/parameters/parameters_types.h
new file mode 100644
index 0000000..6584b3e
--- /dev/null
+++ b/src/parameters/parameters_types.h
@@ -0,0 +1,15 @@
+#ifndef _JH_CLI_PARAMETERS_TYPES_H_
+#define _JH_CLI_PARAMETERS_TYPES_H_
+
+#define JH_PARAMETERS_COUNT 3
+
+struct JH_parameters
+{
+ int reply_rate;
+
+ /* JH **********************************************************************/
+ const char * restrict socket_name;
+ const char * restrict dest_socket_name;
+};
+
+#endif
diff --git a/src/pervasive.h b/src/pervasive.h
new file mode 100644
index 0000000..27d832d
--- /dev/null
+++ b/src/pervasive.h
@@ -0,0 +1,28 @@
+#ifndef _JH_PERVASIVE_H_
+#define _JH_PERVASIVE_H_
+
+#include <string.h>
+
+#define JH_PROGRAM_VERSION 1
+#define JH_PROTOCOL_VERSION 1
+
+#ifdef __FRAMA_C__
+ #define JH_RUNNING_FRAMA_C 1
+#endif
+
+#define JH_DEBUG_ALL 1
+
+#ifndef JH_DEBUG_ALL
+ #define JH_DEBUG_ALL 0
+#endif
+
+#define JH__TO_STRING(x) #x
+#define JH_TO_STRING(x) JH__TO_STRING(x)
+#define JH_ISOLATE(a) do {a} while (0)
+
+/* strncmp stops at '\0' and strlen does not count '\0'. */
+#define JH_IS_PREFIX(a, b) (strncmp(a, b, strlen(a)) == 0)
+
+#define JH_STRING_EQUALS(a, b) (strcmp(a, b) == 0)
+
+#endif
diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt
new file mode 100644
index 0000000..9ade789
--- /dev/null
+++ b/src/server/CMakeLists.txt
@@ -0,0 +1,15 @@
+set(
+ SRC_FILES ${SRC_FILES}
+ ${CMAKE_CURRENT_SOURCE_DIR}/server.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/server_create_socket.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/server_finalize.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/server_initialize.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/server_joining_threads.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/server_new_connection.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/server_signal.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/server_wait_for_event.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/server_worker.c
+)
+
+set(SRC_FILES ${SRC_FILES} PARENT_SCOPE)
+
diff --git a/src/server/server.c b/src/server/server.c
new file mode 100644
index 0000000..640584f
--- /dev/null
+++ b/src/server/server.c
@@ -0,0 +1,101 @@
+#include <signal.h>
+#include <string.h>
+#include <stdio.h>
+
+#include "../parameters/parameters.h"
+
+#include "server.h"
+
+int JH_server_main
+(
+ const struct JH_parameters params [const restrict static 1]
+)
+{
+ struct JH_server server;
+ JH_index retries;
+
+ retries = 0;
+ /* TODO
+ if (JH_server_set_signal_handlers < 0)
+ {
+ return -1;
+ }
+ */
+
+ if (JH_server_initialize(&server, params) < 0)
+ {
+ return -1;
+ }
+
+ while (JH_server_is_running())
+ {
+ switch (JH_server_wait_for_event(&server))
+ {
+ case 0: /* Timed out or signal'd. */
+ JH_S_DEBUG(stderr, 1, "Timed out...");
+ JH_server_handle_joining_threads(&server);
+
+ retries = 0;
+
+ break;
+
+ case 1: /* New client attempted connection. */
+ JH_S_DEBUG(stderr, 1, "New connection.");
+ JH_server_handle_joining_threads(&server);
+ (void) JH_server_handle_new_connection(&server);
+
+ retries = 0;
+
+ break;
+
+ case -1: /* Something bad happened. */
+ retries += 1;
+
+ if (retries == JH_SERVER_MAX_RETRIES)
+ {
+ JH_server_finalize(&server);
+
+ return -1;
+ }
+
+ break;
+
+ default:
+ JH_S_PROG_ERROR
+ (
+ stderr,
+ "Unexpected wait_for_event return value."
+ );
+
+ break;
+ }
+ }
+
+ /* Waiting for the threads to join... */
+ while (server.workers.currently_running > 0)
+ {
+ switch (JH_server_wait_for_event(&server))
+ {
+ case 0: /* Timed out. */
+ case 1: /* New client attempted connection. */
+ JH_server_handle_joining_threads(&server);
+ break;
+
+ case -1: /* Something bad happened. */
+ retries += 1;
+
+ if (retries == JH_SERVER_MAX_RETRIES)
+ {
+ JH_server_finalize(&server);
+
+ return -1;
+ }
+ break;
+ }
+ }
+
+ JH_server_finalize(&server);
+
+ return 0;
+}
+
diff --git a/src/server/server.h b/src/server/server.h
new file mode 100644
index 0000000..9d7f4b1
--- /dev/null
+++ b/src/server/server.h
@@ -0,0 +1,83 @@
+#ifndef _JH_SERVER_SERVER_H_
+#define _JH_SERVER_SERVER_H_
+
+#include "../parameters/parameters_types.h"
+
+#include "server_types.h"
+
+int JH_server_initialize
+(
+ struct JH_server server [const restrict static 1],
+ const struct JH_parameters params [const restrict static 1]
+);
+
+int JH_server_socket_open
+(
+ struct JH_server_socket server_socket [const restrict static 1],
+ const char socket_name [const restrict static 1]
+);
+
+void JH_server_request_termination (void);
+int JH_server_is_running (void);
+int JH_server_set_signal_handlers (void);
+
+int JH_server_main
+(
+ const struct JH_parameters params [const restrict static 1]
+);
+
+void JH_server_finalize (struct JH_server [const restrict static 1]);
+
+int JH_server_wait_for_event
+(
+ struct JH_server server [const restrict static 1]
+);
+
+void JH_server_handle_joining_threads
+(
+ struct JH_server server [const restrict static 1]
+);
+
+int JH_server_handle_new_connection
+(
+ struct JH_server server [const restrict static 1]
+);
+
+void * JH_server_worker_main (void * input);
+
+int JH_server_worker_receive
+(
+ struct JH_server_worker worker [const restrict static 1]
+);
+
+int JH_server_worker_handle_request
+(
+ struct JH_server_worker worker [const restrict static 1]
+);
+
+int JH_server_worker_send_confirm_pipelining_support
+(
+ struct JH_server_worker worker [const restrict static 1]
+);
+
+int JH_server_worker_send_confirm_protocol_version
+(
+ struct JH_server_worker worker [const restrict static 1]
+);
+
+int JH_server_worker_send_positive
+(
+ struct JH_server_worker worker [const restrict static 1]
+);
+
+int JH_server_worker_send_negative
+(
+ struct JH_server_worker worker [const restrict static 1]
+);
+
+int JH_server_worker_send_generated_reply
+(
+ struct JH_server_worker worker [const restrict static 1]
+);
+
+#endif
diff --git a/src/server/server_create_socket.c b/src/server/server_create_socket.c
new file mode 100644
index 0000000..5e0c00b
--- /dev/null
+++ b/src/server/server_create_socket.c
@@ -0,0 +1,195 @@
+#include <errno.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <sys/socket.h>
+#include <sys/un.h>
+
+#include "../error/error.h"
+
+#include "server.h"
+
+static int create_socket (int result [const restrict static 1])
+{
+ const int old_errno = errno;
+
+ errno = 0;
+ *result = socket(AF_UNIX, SOCK_STREAM, 0);
+
+ if (*result == -1)
+ {
+ JH_FATAL
+ (
+ stderr,
+ "Unable to create server socket: %s.",
+ strerror(errno)
+ );
+
+ errno = old_errno;
+
+ return -1;
+ }
+
+ errno = old_errno;
+
+ return 0;
+}
+
+static int bind_socket
+(
+ const int socket,
+ const char socket_name [const restrict static 1]
+)
+{
+ struct sockaddr_un addr;
+ const int old_errno = errno;
+
+ errno = 0;
+ memset(&addr, 0, sizeof(struct sockaddr_un));
+
+ addr.sun_family = AF_UNIX;
+
+ strncpy
+ (
+ (void *) addr.sun_path,
+ (const void *) socket_name,
+ (sizeof(addr.sun_path) - 1)
+ );
+
+ errno = old_errno;
+
+ if
+ (
+ bind
+ (
+ socket,
+ (const struct sockaddr *) &addr,
+ (socklen_t) sizeof(struct sockaddr_un)
+ ) != 0
+ )
+ {
+ JH_FATAL
+ (
+ stderr,
+ "Unable to bind server socket to %s: %s.",
+ socket_name,
+ strerror(errno)
+ );
+
+ errno = old_errno;
+
+ return -1;
+ }
+
+ errno = old_errno;
+
+ return 0;
+}
+
+static int set_socket_to_unblocking (const int socket)
+{
+ int current_flags;
+ const int old_errno = errno;
+
+ current_flags = fcntl(socket, F_GETFD);
+
+ if (current_flags == -1)
+ {
+ JH_FATAL
+ (
+ stderr,
+ "Unable to get server socket properties: %s.",
+ strerror(errno)
+ );
+
+ errno = old_errno;
+
+ return -1;
+ }
+
+ /* current_flags = current_flags & (~O_NONBLOCK); */
+
+ current_flags = fcntl(socket, F_SETFD, (current_flags | O_NONBLOCK));
+
+ if (current_flags == -1)
+ {
+ JH_FATAL
+ (
+ stderr,
+ "Unable to set server socket properties: %s.",
+ strerror(errno)
+ );
+
+ errno = old_errno;
+
+ return -2;
+ }
+
+ errno = old_errno;
+
+ return 0;
+}
+
+static int set_socket_as_listener (const int socket)
+{
+ const int old_errno = errno;
+
+ if (listen(socket, JH_SERVER_SOCKET_LISTEN_BACKLOG) != 0)
+ {
+ JH_FATAL
+ (
+ stderr,
+ "Unable to set server socket properties: %s.",
+ strerror(errno)
+ );
+
+ errno = old_errno;
+
+ return -1;
+ }
+
+ errno = old_errno;
+
+ return 0;
+}
+
+int JH_server_socket_open
+(
+ struct JH_server_socket server_socket [const restrict static 1],
+ const char socket_name [const restrict static 1]
+)
+{
+ printf("\"%s\"\n", socket_name);
+ if (create_socket(&(server_socket->file_descriptor)) < 0)
+ {
+ return -1;
+ }
+
+ if (bind_socket(server_socket->file_descriptor, socket_name) < 0)
+ {
+ close(server_socket->file_descriptor);
+
+ return -1;
+ }
+
+ if (set_socket_to_unblocking(server_socket->file_descriptor) < 0)
+ {
+ close(server_socket->file_descriptor);
+
+ return -1;
+ }
+
+ if (set_socket_as_listener(server_socket->file_descriptor) < 0)
+ {
+ close(server_socket->file_descriptor);
+
+ return -1;
+ }
+
+ FD_ZERO(&(server_socket->as_a_set));
+ FD_SET(server_socket->file_descriptor, &(server_socket->as_a_set));
+
+ return 0;
+}
diff --git a/src/server/server_finalize.c b/src/server/server_finalize.c
new file mode 100644
index 0000000..25ea672
--- /dev/null
+++ b/src/server/server_finalize.c
@@ -0,0 +1,42 @@
+#include <stdlib.h>
+#include <unistd.h>
+
+#include "../parameters/parameters.h"
+
+#include "server.h"
+
+static void finalize_thread_collection
+(
+ struct JH_server_thread_collection workers [const restrict static 1]
+)
+{
+ free((void *) workers->threads);
+
+ workers->threads_capacity = 0;
+
+ pthread_mutex_destroy(&(workers->mutex));
+ pthread_barrier_destroy(&(workers->barrier));
+
+ workers->currently_running = 0;
+}
+
+static void finalize_socket
+(
+ struct JH_server_socket socket [const restrict static 1]
+)
+{
+ FD_ZERO(&(socket->as_a_set));
+
+ close(socket->file_descriptor);
+
+ socket->file_descriptor = -1;
+}
+
+void JH_server_finalize
+(
+ struct JH_server server [const restrict static 1]
+)
+{
+ finalize_thread_collection(&(server->workers));
+ finalize_socket(&(server->socket));
+}
diff --git a/src/server/server_initialize.c b/src/server/server_initialize.c
new file mode 100644
index 0000000..a125046
--- /dev/null
+++ b/src/server/server_initialize.c
@@ -0,0 +1,99 @@
+#include <signal.h>
+#include <string.h>
+#include <stdio.h>
+
+#include "../parameters/parameters.h"
+
+#include "server.h"
+
+static int initialize_worker_collection
+(
+ struct JH_server_thread_collection c [const restrict static 1]
+)
+{
+ int error;
+
+ c->threads = (struct JH_server_thread_data *) NULL;
+ c->threads_capacity = 0;
+ c->currently_running = 0;
+
+ error =
+ pthread_mutex_init
+ (
+ &(c->mutex),
+ (const pthread_mutexattr_t *) NULL
+ );
+
+ if (error != 0)
+ {
+ JH_FATAL
+ (
+ stderr,
+ "Unable to initialize worker collection's mutex: %s.",
+ strerror(error)
+ );
+
+ return -1;
+ }
+
+ error =
+ pthread_barrier_init
+ (
+ &(c->barrier),
+ (const pthread_barrierattr_t *) NULL,
+ 2
+ );
+
+ if (error != 0)
+ {
+ JH_FATAL
+ (
+ stderr,
+ "[F] Unable to initialize worker collection's barrier: %s.",
+ strerror(error)
+ );
+
+ return -1;
+ }
+
+ return 0;
+}
+
+void initialize_thread_parameters
+(
+ struct JH_server server [const restrict static 1],
+ const struct JH_parameters params [const restrict static 1]
+)
+{
+ server->thread_params.thread_collection = &(server->workers);
+ server->thread_params.server_params = params;
+ server->thread_params.socket = -1;
+}
+
+int JH_server_initialize
+(
+ struct JH_server server [const restrict static 1],
+ const struct JH_parameters params [const restrict static 1]
+)
+{
+ if (initialize_worker_collection(&(server->workers)) < 0)
+ {
+ return -1;
+ }
+
+ if
+ (
+ JH_server_socket_open
+ (
+ &(server->socket),
+ JH_parameters_get_socket_name(params)
+ ) < 0
+ )
+ {
+ return -2;
+ }
+
+ initialize_thread_parameters(server, params);
+
+ return 0;
+}
diff --git a/src/server/server_joining_threads.c b/src/server/server_joining_threads.c
new file mode 100644
index 0000000..e1d92ca
--- /dev/null
+++ b/src/server/server_joining_threads.c
@@ -0,0 +1,38 @@
+#include <sys/socket.h>
+
+#include <errno.h>
+#include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <unistd.h>
+
+#include "../parameters/parameters.h"
+
+#include "server.h"
+
+void JH_server_handle_joining_threads
+(
+ struct JH_server server [const restrict static 1]
+)
+{
+ JH_index i;
+
+ pthread_mutex_lock(&(server->workers.mutex));
+
+ for (i = 0; i < server->workers.threads_capacity; ++i)
+ {
+ if (server->workers.threads[i].state == JH_SERVER_JOINING_THREAD)
+ {
+ JH_DEBUG(stderr, 1, "Joining thread %u", i);
+
+ pthread_join(server->workers.threads[i].posix_id, (void **) NULL);
+
+ server->workers.threads[i].state = JH_SERVER_NO_THREAD;
+
+ server->workers.currently_running -= 1;
+ }
+ }
+
+ pthread_mutex_unlock(&(server->workers.mutex));
+}
diff --git a/src/server/server_new_connection.c b/src/server/server_new_connection.c
new file mode 100644
index 0000000..23a2770
--- /dev/null
+++ b/src/server/server_new_connection.c
@@ -0,0 +1,180 @@
+#include <sys/socket.h>
+
+#include <errno.h>
+#include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <unistd.h>
+
+#include "../parameters/parameters.h"
+
+#include "server.h"
+
+static int get_new_socket (struct JH_server server [const restrict static 1])
+{
+ const int old_errno = errno;
+
+ server->thread_params.socket =
+ accept
+ (
+ server->socket.file_descriptor,
+ (struct sockaddr *) NULL,
+ (socklen_t *) NULL
+ );
+
+ if (server->thread_params.socket == -1)
+ {
+ JH_ERROR
+ (
+ stderr,
+ "Unable to accept on the server's socket: %s.",
+ strerror(errno)
+ );
+
+ errno = old_errno;
+
+ return -1;
+ }
+
+ errno = old_errno;
+
+ return 0;
+}
+
+static int get_new_thread (struct JH_server server [const restrict static 1])
+{
+ struct JH_server_thread_data * new_threads;
+ JH_index i;
+
+ pthread_mutex_lock(&(server->workers.mutex));
+
+ for (i = 0; i < server->workers.threads_capacity; ++i)
+ {
+ if (server->workers.threads[i].state == JH_SERVER_NO_THREAD)
+ {
+ server->thread_params.thread_id = i;
+
+ pthread_mutex_unlock(&(server->workers.mutex));
+
+ return 0;
+ }
+ }
+
+ if
+ (
+ (server->workers.threads_capacity == JH_INDEX_MAX)
+ ||
+ (
+ (size_t) (server->workers.threads_capacity + 1)
+ > (SIZE_MAX / sizeof(struct JH_server_thread_data))
+ )
+ )
+ {
+ JH_S_ERROR
+ (
+ stderr,
+ "Maximum number of concurrent threads attained, unable to add more."
+ );
+
+ pthread_mutex_unlock(&(server->workers.mutex));
+
+ return -1;
+ }
+
+ server->thread_params.thread_id = server->workers.threads_capacity;
+ server->workers.threads_capacity += 1;
+
+ new_threads =
+ (struct JH_server_thread_data *) realloc
+ (
+ server->workers.threads,
+ (
+ sizeof(struct JH_server_thread_data)
+ * ((size_t) server->workers.threads_capacity)
+ )
+ );
+
+ if (new_threads == ((struct JH_server_thread_data *) NULL))
+ {
+ JH_S_ERROR
+ (
+ stderr,
+ "Reallocation of the threads' data list failed."
+ );
+
+ pthread_mutex_unlock(&(server->workers.mutex));
+
+ return -1;
+ }
+
+ server->workers.threads = new_threads;
+
+ pthread_mutex_unlock(&(server->workers.mutex));
+
+ return 0;
+}
+
+static int spawn_thread (struct JH_server server [const restrict static 1])
+{
+ const JH_index thread_id = server->thread_params.thread_id;
+ int error;
+
+ server->workers.threads[thread_id].state = JH_SERVER_RUNNING_THREAD;
+
+ error =
+ pthread_create
+ (
+ &(server->workers.threads[thread_id].posix_id),
+ (const pthread_attr_t *) NULL,
+ JH_server_worker_main,
+ (void *) &(server->thread_params)
+ );
+
+ if (error != 0)
+ {
+ JH_ERROR
+ (
+ stderr,
+ "Unable to spawn thread: %s.",
+ strerror(error)
+ );
+
+ server->workers.threads[thread_id].state = JH_SERVER_NO_THREAD;
+
+ return -1;
+ }
+
+ pthread_barrier_wait(&(server->workers.barrier));
+
+ server->workers.currently_running += 1;
+
+ return 0;
+}
+
+int JH_server_handle_new_connection
+(
+ struct JH_server server [const restrict static 1]
+)
+{
+ if (get_new_socket(server) < 0)
+ {
+ return -1;
+ }
+
+ if (get_new_thread(server) < 0)
+ {
+ close(server->thread_params.socket);
+
+ return -2;
+ }
+
+ if (spawn_thread(server) < 0)
+ {
+ close(server->thread_params.socket);
+
+ return -3;
+ }
+
+ return 0;
+}
diff --git a/src/server/server_signal.c b/src/server/server_signal.c
new file mode 100644
index 0000000..9361382
--- /dev/null
+++ b/src/server/server_signal.c
@@ -0,0 +1,41 @@
+#include <signal.h>
+#include <string.h>
+#include <stdio.h>
+
+#include "server.h"
+
+static volatile char JH_SERVER_IS_RUNNING = (char) 1;
+
+static void request_termination (int const signo)
+{
+ if ((signo == SIGINT) || (signo == SIGTERM))
+ {
+ JH_server_request_termination();
+ }
+}
+
+void JH_server_request_termination (void)
+{
+ JH_SERVER_IS_RUNNING = (char) 0;
+}
+
+int JH_server_is_running (void)
+{
+ return (int) JH_SERVER_IS_RUNNING;
+}
+
+int JH_server_set_signal_handlers (void)
+{
+ /*
+ struct sigaction act;
+
+ act.sa_handler = request_termination;
+ act.sa_mask =
+ act.sa_flags =
+ act.sa_restorer =
+ */
+
+ /* TODO */
+
+ return -1;
+}
diff --git a/src/server/server_types.h b/src/server/server_types.h
new file mode 100644
index 0000000..f43e47d
--- /dev/null
+++ b/src/server/server_types.h
@@ -0,0 +1,75 @@
+#ifndef _JH_SERVER_SERVER_TYPES_H_
+#define _JH_SERVER_SERVER_TYPES_H_
+
+#include <sys/time.h>
+
+#ifndef JH_RUNNING_FRAMA_C
+ #include <pthread.h>
+#endif
+
+#include "../core/index_types.h"
+
+#include "../parameters/parameters_types.h"
+
+#include "../error/error.h"
+
+#define JH_SERVER_MAX_RETRIES 10
+#define JH_SERVER_BUFFER_SIZE 0
+
+#define JH_SERVER_SOCKET_ACCEPT_TIMEOUT_SEC 5
+#define JH_SERVER_SOCKET_LISTEN_BACKLOG 5
+
+enum JH_server_thread_state
+{
+ JH_SERVER_JOINING_THREAD,
+ JH_SERVER_RUNNING_THREAD,
+ JH_SERVER_NO_THREAD
+};
+
+struct JH_server_thread_data
+{
+#ifndef JH_RUNNING_FRAMA_C
+ pthread_t posix_id;
+#endif
+ enum JH_server_thread_state state;
+};
+
+struct JH_server_thread_collection
+{
+ struct JH_server_thread_data * threads;
+ JH_index threads_capacity;
+#ifndef JH_RUNNING_FRAMA_C
+ pthread_mutex_t mutex;
+ pthread_barrier_t barrier;
+#endif
+ JH_index currently_running;
+};
+
+struct JH_server_socket
+{
+ int file_descriptor;
+ fd_set as_a_set;
+ struct timeval timeout;
+};
+
+struct JH_server_thread_parameters
+{
+ struct JH_server_thread_collection * thread_collection;
+ const struct JH_parameters * server_params;
+ JH_index thread_id;
+ int socket;
+};
+
+struct JH_server_worker
+{
+ struct JH_server_thread_parameters params;
+};
+
+struct JH_server
+{
+ struct JH_server_thread_collection workers;
+ struct JH_server_socket socket;
+ struct JH_server_thread_parameters thread_params;
+};
+
+#endif
diff --git a/src/server/server_wait_for_event.c b/src/server/server_wait_for_event.c
new file mode 100644
index 0000000..c949438
--- /dev/null
+++ b/src/server/server_wait_for_event.c
@@ -0,0 +1,62 @@
+#include <sys/select.h>
+
+#include <errno.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "../error/error.h"
+
+#include "server.h"
+
+int JH_server_wait_for_event
+(
+ struct JH_server server [const static 1]
+)
+{
+ int ready_fds;
+ const int old_errno = errno;
+ fd_set ready_to_read;
+
+ ready_to_read = server->socket.as_a_set;
+
+ /* call to select may alter timeout */
+ memset((void *) &(server->socket.timeout), 0, sizeof(struct timeval));
+
+ server->socket.timeout.tv_sec = JH_SERVER_SOCKET_ACCEPT_TIMEOUT_SEC;
+
+ errno = 0;
+
+ ready_fds = select
+ (
+ (server->socket.file_descriptor + 1),
+ &ready_to_read,
+ (fd_set *) NULL,
+ (fd_set *) NULL,
+ &(server->socket.timeout)
+ );
+
+ JH_DEBUG(stderr, 1, "SELECT returned: %i, errno is %i.", ready_fds, errno);
+
+ if (errno == EINTR)
+ {
+ ready_fds = 0;
+ }
+
+ if (ready_fds == -1)
+ {
+ JH_FATAL
+ (
+ stderr,
+ "Unable to wait on server socket: %s.",
+ strerror(errno)
+ );
+
+ errno = old_errno;
+
+ return -1;
+ }
+
+ errno = old_errno;
+
+ return ready_fds;
+}
diff --git a/src/server/server_worker.c b/src/server/server_worker.c
new file mode 100644
index 0000000..12387a7
--- /dev/null
+++ b/src/server/server_worker.c
@@ -0,0 +1,73 @@
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <stdio.h>
+#include <errno.h>
+
+#include "../filter/filter.h"
+
+#include "server.h"
+
+static int initialize
+(
+ struct JH_server_worker worker [const restrict static 1],
+ void * input
+)
+{
+ const int old_errno = errno;
+
+ memcpy
+ (
+ (void *) &(worker->params),
+ (const void *) input,
+ sizeof(struct JH_server_thread_parameters)
+ );
+
+ pthread_barrier_wait(&(worker->params.thread_collection->barrier));
+
+ return 0;
+}
+
+static void finalize
+(
+ struct JH_server_worker worker [const restrict static 1]
+)
+{
+ close(worker->params.socket);
+
+ pthread_mutex_lock(&(worker->params.thread_collection->mutex));
+
+ worker->params.thread_collection->threads[worker->params.thread_id].state =
+ JH_SERVER_JOINING_THREAD;
+
+ pthread_mutex_unlock(&(worker->params.thread_collection->mutex));
+}
+
+void * JH_server_worker_main (void * input)
+{
+ struct JH_limiter_filter filter;
+ struct JH_server_worker worker;
+
+ initialize(&worker, input);
+
+ if (JH_limiter_filter_initialize(&filter, worker.params.socket) < 0)
+ {
+ finalize(&worker);
+
+ return NULL;
+ }
+
+ while (JH_server_is_running())
+ {
+ if (JH_limiter_filter_step(&filter, worker.params.server_params) < 0)
+ {
+ break;
+ }
+ }
+
+ JH_limiter_filter_finalize(&filter);
+
+ finalize(&worker);
+
+ return NULL;
+}