aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNathanael Sensfelder <SpamShield0@MultiAgentSystems.org>2017-06-14 17:16:14 +0200
committerNathanael Sensfelder <SpamShield0@MultiAgentSystems.org>2017-06-14 17:16:14 +0200
commit08dca7c0f612864513354288fc676968cddfbd96 (patch)
tree050d25cde509c708f265c235a330f20aadd692e8
downloadstorage-08dca7c0f612864513354288fc676968cddfbd96.zip
storage-08dca7c0f612864513354288fc676968cddfbd96.tar.bz2
Initial commit
-rw-r--r--CMakeLists.txt30
-rw-r--r--LICENSE27
-rw-r--r--README.md25
-rw-r--r--src/CMakeLists.txt12
-rw-r--r--src/core/CMakeLists.txt7
-rw-r--r--src/core/index.c19
-rw-r--r--src/core/index.h8
-rw-r--r--src/core/index_types.h28
-rw-r--r--src/error/CMakeLists.txt6
-rw-r--r--src/error/error.h143
-rw-r--r--src/filter/CMakeLists.txt6
-rw-r--r--src/filter/filter.c368
-rw-r--r--src/filter/filter.h27
-rw-r--r--src/filter/filter_types.h20
-rw-r--r--src/main.c54
-rw-r--r--src/parameters/CMakeLists.txt7
-rw-r--r--src/parameters/parameters.c100
-rw-r--r--src/parameters/parameters.h40
-rw-r--r--src/parameters/parameters_getters.c43
-rw-r--r--src/parameters/parameters_types.h22
-rw-r--r--src/pervasive.h28
-rw-r--r--src/server/CMakeLists.txt16
-rw-r--r--src/server/server.c101
-rw-r--r--src/server/server.h98
-rw-r--r--src/server/server_create_socket.c195
-rw-r--r--src/server/server_finalize.c45
-rw-r--r--src/server/server_initialize.c182
-rw-r--r--src/server/server_joining_threads.c38
-rw-r--r--src/server/server_new_connection.c202
-rw-r--r--src/server/server_signal.c41
-rw-r--r--src/server/server_types.h87
-rw-r--r--src/server/server_wait_for_event.c62
-rw-r--r--src/server/server_worker.c274
-rw-r--r--src/server/server_worker_data_merger.c190
34 files changed, 2551 insertions, 0 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
new file mode 100644
index 0000000..b35d2f1
--- /dev/null
+++ b/CMakeLists.txt
@@ -0,0 +1,30 @@
+cmake_minimum_required(VERSION 3.0 FATAL_ERROR)
+
+project("JabberHive - Storage")
+
+include(FindPkgConfig)
+
+add_subdirectory(src)
+add_definitions(-D_POSIX_SOURCE)
+add_definitions(-D_POSIX_C_SOURCE=200809L)
+
+set(CMAKE_C_FLAGS $ENV{CFLAGS})
+if(CMAKE_COMPILER_IS_GNUCC)
+ set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -g -Wall -Wpedantic -Wconversion")
+ message(STATUS "GNUCC detected. Adding '-O3' parameter.")
+ set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -O3")
+endif()
+message(STATUS "CFLAGS=${CMAKE_C_FLAGS}")
+
+# ${SRC_FILES} is recursively defined in the subdirectories.
+# Each subdirectory only adds the source files that are present at its level.
+add_executable(jabberhive-storage ${SRC_FILES})
+set_property(TARGET jabberhive-storage PROPERTY C_STANDARD 99)
+set_property(TARGET jabberhive-storage PROPERTY C_STANDARD_REQUIRED ON)
+
+find_package(Threads)
+target_link_libraries(jabberhive-storage ${CMAKE_THREAD_LIBS_INIT})
+target_link_libraries(jabberhive-storage m)
+
+## OPTION HANDLING #############################################################
+# TODO
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..f546fa2
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,27 @@
+Copyright (c) 2017, Nathanaƫl Sensfelder
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+* Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+
+* Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+* Neither the name of JabberHive nor the names of its
+ contributors may be used to endorse or promote products derived from
+ this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..313e70a
--- /dev/null
+++ b/README.md
@@ -0,0 +1,25 @@
+## What is JabberHive?
+JabberHive is a modular ChatBot system. All "modules" are in fact separate
+programs linked together using the JabberHive Protocol. Please refer to the
+protocol for more information.
+
+## Component Description
+* Filter for a JabberHive network.
+* Stores passing STRINGs from ?RLR and ?RL requests.
+
+## JabberHive Protocol Compatibility
+* **Protocol Version(s):** 1.
+* **Inbound Connections:** Multiple.
+* **Outbound Connections:** Multiple.
+* **Pipelining:** No.
+* **Behavior:** Filter.
+
+## Notes
+* Does not correctly reply to Pipelining & Protocol Version requests.
+* The jabberhive-cli Gateway can be used to feed a storage file to a JabberHive
+network.
+
+## Dependencies
+- POSIX compliant OS.
+- C compiler (with C99 support).
+- CMake.
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
new file mode 100644
index 0000000..0b242bc
--- /dev/null
+++ b/src/CMakeLists.txt
@@ -0,0 +1,12 @@
+add_subdirectory(error)
+add_subdirectory(core)
+add_subdirectory(filter)
+add_subdirectory(server)
+add_subdirectory(parameters)
+
+set(
+ SRC_FILES ${SRC_FILES}
+ ${CMAKE_CURRENT_SOURCE_DIR}/main.c
+)
+
+set(SRC_FILES ${SRC_FILES} PARENT_SCOPE)
diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt
new file mode 100644
index 0000000..f5ede7c
--- /dev/null
+++ b/src/core/CMakeLists.txt
@@ -0,0 +1,7 @@
+set(
+ SRC_FILES ${SRC_FILES}
+ ${CMAKE_CURRENT_SOURCE_DIR}/index.c
+)
+
+set(SRC_FILES ${SRC_FILES} PARENT_SCOPE)
+
diff --git a/src/core/index.c b/src/core/index.c
new file mode 100644
index 0000000..6066e3c
--- /dev/null
+++ b/src/core/index.c
@@ -0,0 +1,19 @@
+#include <math.h>
+
+#include "index_types.h"
+
+size_t JH_index_string_length (const JH_index i)
+{
+ if (i == 0)
+ {
+ return 1;
+ }
+ else if (i > 0)
+ {
+ return (size_t) (log10l((long double) i) + 1);
+ }
+ else
+ {
+ return (size_t) (log10l((long double) (-i)) + 2);
+ }
+}
diff --git a/src/core/index.h b/src/core/index.h
new file mode 100644
index 0000000..52077b6
--- /dev/null
+++ b/src/core/index.h
@@ -0,0 +1,8 @@
+#ifndef _JH_CORE_INDEX_H_
+#define _JH_CORE_INDEX_H_
+
+#include "index_types.h"
+
+size_t JH_index_string_length (const JH_index i);
+
+#endif
diff --git a/src/core/index_types.h b/src/core/index_types.h
new file mode 100644
index 0000000..2180815
--- /dev/null
+++ b/src/core/index_types.h
@@ -0,0 +1,28 @@
+#ifndef _JH_CORE_INDEX_TYPES_H_
+#define _JH_CORE_INDEX_TYPES_H_
+
+#include "../pervasive.h"
+
+/*
+ * JH_index is a replacement for size_t. As many indices are stored for every
+ * word learned, having control over which type of variable is used to represent
+ * those indices lets us scale the RAM usage.
+ */
+
+#include <limits.h>
+#include <stdint.h>
+
+/* Must be unsigned. */
+typedef unsigned int JH_index;
+
+/* Must be > 0. */
+#define JH_INDEX_MAX UINT_MAX
+#define JH_INDEX_TAG "%u"
+
+#ifndef JH_RUNNING_FRAMA_C
+ #if (JH_INDEX_MAX > SIZE_MAX)
+ #error "JH_index should not be able to go higher than a size_t variable."
+ #endif
+#endif
+
+#endif
diff --git a/src/error/CMakeLists.txt b/src/error/CMakeLists.txt
new file mode 100644
index 0000000..fa07534
--- /dev/null
+++ b/src/error/CMakeLists.txt
@@ -0,0 +1,6 @@
+set(
+ SRC_FILES ${SRC_FILES}
+)
+
+set(SRC_FILES ${SRC_FILES} PARENT_SCOPE)
+
diff --git a/src/error/error.h b/src/error/error.h
new file mode 100644
index 0000000..145c838
--- /dev/null
+++ b/src/error/error.h
@@ -0,0 +1,143 @@
+#ifndef _JH_ERROR_ERROR_H_
+#define _JH_ERROR_ERROR_H_
+
+#include <stdio.h>
+
+#include "../pervasive.h"
+
+#ifndef JH_DEBUG_PROGRAM_FLOW
+ #define JH_DEBUG_PROGRAM_FLOW (0 || JH_DEBUG_ALL)
+#endif
+
+#ifndef JH_DEBUG_CONFIG
+ #define JH_DEBUG_CONFIG (0 || JH_DEBUG_ALL)
+#endif
+
+#ifndef JH_DEBUG_LEARNING
+ #define JH_DEBUG_LEARNING (0 || JH_DEBUG_ALL)
+#endif
+
+#ifndef JH_DEBUG_NETWORK
+ #define JH_DEBUG_NETWORK 1
+#endif
+
+#ifndef JH_DEBUG_NETWORK
+ #define JH_DEBUG_NETWORK (0 || JH_DEBUG_ALL)
+#endif
+
+#define JH_ENABLE_WARNINGS_OUTPUT 1
+#define JH_ENABLE_RUNTIME_ERRORS_OUTPUT 1
+#define JH_ENABLE_PROGRAMMING_ERRORS_OUTPUT 1
+#define JH_ENABLE_FATAL_ERROR_OUTPUT 1
+
+#ifdef JH_ENABLE_ERROR_LOCATION
+ #define JH_LOCATION " [" __FILE__ "][" JH_TO_STRING(__LINE__) "]"
+#else
+ #define JH_LOCATION ""
+#endif
+
+#define JH_PRINT_STDERR(io, symbol, str, ...)\
+ fprintf(io, "[" symbol "]" JH_LOCATION " " str "\n", __VA_ARGS__);
+
+/*
+ * Given that we use preprocessor contants as flags, we can expect the compilers
+ * to remove the test condition for disabled flags. No need to be shy about
+ * allowing many debug options.
+ */
+
+#define JH_DEBUG(io, flag, str, ...)\
+ JH_ISOLATE\
+ (\
+ if (flag)\
+ {\
+ JH_PRINT_STDERR(io, "D", str, __VA_ARGS__);\
+ }\
+ )
+
+
+#define JH_WARNING(io, str, ...)\
+ JH_ISOLATE\
+ (\
+ if (JH_ENABLE_WARNINGS_OUTPUT)\
+ {\
+ JH_PRINT_STDERR(io, "W", str, __VA_ARGS__);\
+ }\
+ )
+
+#define JH_ERROR(io, str, ...)\
+ JH_ISOLATE\
+ (\
+ if (JH_ENABLE_RUNTIME_ERRORS_OUTPUT)\
+ {\
+ JH_PRINT_STDERR(io, "E", str, __VA_ARGS__);\
+ }\
+ )
+
+#define JH_PROG_ERROR(io, str, ...)\
+ JH_ISOLATE\
+ (\
+ if (JH_ENABLE_PROGRAMMING_ERRORS_OUTPUT)\
+ {\
+ JH_PRINT_STDERR(io, "P", str, __VA_ARGS__);\
+ }\
+ )
+
+#define JH_FATAL(io, str, ...)\
+ JH_ISOLATE\
+ (\
+ if (JH_ENABLE_FATAL_ERROR_OUTPUT)\
+ {\
+ JH_PRINT_STDERR(io, "F", str, __VA_ARGS__);\
+ }\
+ )
+
+/* For outputs without dynamic content (static). ******************************/
+
+#define JH_PRINT_S_STDERR(io, symbol, str)\
+ fprintf(io, "[" symbol "]" JH_LOCATION " " str "\n");
+
+#define JH_S_DEBUG(io, flag, str)\
+ JH_ISOLATE\
+ (\
+ if (flag)\
+ {\
+ JH_PRINT_S_STDERR(io, "D", str);\
+ }\
+ )
+
+#define JH_S_WARNING(io, str)\
+ JH_ISOLATE\
+ (\
+ if (JH_ENABLE_WARNINGS_OUTPUT)\
+ {\
+ JH_PRINT_S_STDERR(io, "W", str);\
+ }\
+ )
+
+#define JH_S_ERROR(io, str)\
+ JH_ISOLATE\
+ (\
+ if (JH_ENABLE_RUNTIME_ERRORS_OUTPUT)\
+ {\
+ JH_PRINT_S_STDERR(io, "E", str);\
+ }\
+ )
+
+#define JH_S_PROG_ERROR(io, str)\
+ JH_ISOLATE\
+ (\
+ if (JH_ENABLE_PROGRAMMING_ERRORS_OUTPUT)\
+ {\
+ JH_PRINT_S_STDERR(io, "P", str);\
+ }\
+ )
+
+#define JH_S_FATAL(io, str)\
+ JH_ISOLATE\
+ (\
+ if (JH_ENABLE_FATAL_ERROR_OUTPUT)\
+ {\
+ JH_PRINT_S_STDERR(io, "F", str);\
+ }\
+ )
+#endif
diff --git a/src/filter/CMakeLists.txt b/src/filter/CMakeLists.txt
new file mode 100644
index 0000000..c8add44
--- /dev/null
+++ b/src/filter/CMakeLists.txt
@@ -0,0 +1,6 @@
+set(
+ SRC_FILES ${SRC_FILES}
+ ${CMAKE_CURRENT_SOURCE_DIR}/filter.c
+)
+
+set(SRC_FILES ${SRC_FILES} PARENT_SCOPE)
diff --git a/src/filter/filter.c b/src/filter/filter.c
new file mode 100644
index 0000000..f5f89c7
--- /dev/null
+++ b/src/filter/filter.c
@@ -0,0 +1,368 @@
+#include <stdlib.h>
+#include <errno.h>
+#include <string.h>
+#include <unistd.h>
+
+#include "../pervasive.h"
+#include "../error/error.h"
+#include "../parameters/parameters.h"
+
+#include "filter.h"
+
+static int send_downstream
+(
+ struct JH_filter filter [const restrict static 1],
+ const int upstream_socket,
+ const int downstream_socket,
+ FILE storage_file [const restrict static 1]
+)
+{
+ char c;
+ ssize_t io_bytes;
+ const int old_errno = errno;
+
+ for (;;)
+ {
+ errno = 0;
+
+ io_bytes =
+ read
+ (
+ upstream_socket,
+ (void *) &c,
+ sizeof(char)
+ );
+
+ if (io_bytes == -1)
+ {
+ JH_ERROR
+ (
+ stderr,
+ "Upstream read error %d (\"%s\").",
+ errno,
+ strerror(errno)
+ );
+
+ errno = old_errno;
+
+ return -1;
+ }
+ else if (io_bytes == 0)
+ {
+ return 1;
+ }
+
+ switch (filter->buffer_index)
+ {
+ case 0:
+ if (c == '?')
+ {
+ filter->buffer_index = 1;
+ }
+ else
+ {
+ filter->buffer_index = -1;
+ }
+ break;
+
+ case 1:
+ if (c == 'R')
+ {
+ filter->buffer_index = 2;
+ }
+ else
+ {
+ filter->buffer_index = -1;
+ }
+ break;
+
+ case 2:
+ if (c == 'L')
+ {
+ filter->buffer_index = 3;
+ }
+ else
+ {
+ filter->buffer_index = -1;
+ }
+ break;
+
+ case 3:
+ if (c == ' ')
+ {
+ filter->buffer_index = JH_FILTER_BUFFER_SIZE;
+ }
+ else if (c == 'R')
+ {
+ filter->buffer_index = 4;
+ }
+ else
+ {
+ filter->buffer_index = -1;
+ }
+ break;
+
+ case 4:
+ if (c == ' ')
+ {
+ filter->buffer_index = JH_FILTER_BUFFER_SIZE;
+ }
+ else
+ {
+ filter->buffer_index = -1;
+ }
+ break;
+
+ case JH_FILTER_BUFFER_SIZE:
+ if (putc(c, storage_file) != c)
+ {
+ /* TODO */
+ JH_S_WARNING(stderr, "Writing to temp file failed.");
+ }
+
+ if (c == '\n')
+ {
+ fflush(storage_file);
+ }
+
+ break;
+
+ default:
+ break;
+ }
+
+ errno = 0;
+
+ io_bytes = write
+ (
+ downstream_socket,
+ (void *) &c,
+ sizeof(char)
+ );
+
+ if (io_bytes == -1)
+ {
+ JH_ERROR
+ (
+ stderr,
+ "Upstream write error %d (\"%s\").",
+ errno,
+ strerror(errno)
+ );
+
+ errno = old_errno;
+
+ return -1;
+ }
+
+ errno = old_errno;
+
+ if (c == '\n')
+ {
+ filter->buffer_index = 0;
+ filter->state = JH_FILTER_IS_SENDING_UPSTREAM;
+
+ return 0;
+ }
+ }
+}
+
+static int send_upstream
+(
+ struct JH_filter filter [const restrict static 1],
+ const int upstream_socket,
+ const int downstream_socket
+)
+{
+ char c;
+ ssize_t io_bytes;
+ const int old_errno = errno;
+
+ for (;;)
+ {
+ errno = 0;
+
+ io_bytes =
+ read
+ (
+ downstream_socket,
+ (void *) &c,
+ sizeof(char)
+ );
+
+ if (io_bytes == -1)
+ {
+ JH_ERROR
+ (
+ stderr,
+ "Downstream read error %d (\"%s\").",
+ errno,
+ strerror(errno)
+ );
+
+ errno = old_errno;
+
+ return -1;
+ }
+ else if (io_bytes == 0)
+ {
+ return 1;
+ }
+
+ errno = 0;
+
+ io_bytes = write
+ (
+ upstream_socket,
+ (void *) &c,
+ sizeof(char)
+ );
+
+ if (io_bytes == -1)
+ {
+ JH_ERROR
+ (
+ stderr,
+ "Upstream write error %d (\"%s\").",
+ errno,
+ strerror(errno)
+ );
+
+ errno = old_errno;
+
+ return -1;
+ }
+
+ errno = old_errno;
+
+ switch (filter->buffer_index)
+ {
+ case -1:
+ if (c == '\n')
+ {
+ filter->buffer_index = 0;
+ }
+ break;
+
+ case 0:
+ if (c == '!')
+ {
+ filter->buffer_index = 1;
+ }
+ else
+ {
+ filter->buffer_index = -1;
+ }
+ break;
+
+ case 1:
+ if ((c == 'N') || (c == 'P'))
+ {
+ filter->buffer_index = 2;
+ }
+ else
+ {
+ filter->buffer_index = -1;
+ }
+ break;
+
+ case 2:
+ if (c == ' ')
+ {
+ filter->buffer_index = 3;
+ }
+ else
+ {
+ filter->buffer_index = -1;
+ }
+ break;
+
+ case 3:
+ if (c == '\n')
+ {
+ filter->buffer_index = 0;
+ filter->state = JH_FILTER_IS_SENDING_DOWNSTREAM;
+
+ return 0;
+ }
+ else
+ {
+ filter->buffer_index = -1;
+ }
+ break;
+
+ default:
+ JH_PROG_ERROR
+ (
+ stderr,
+ "Invalid value for 'filter->buffer_index': %d.",
+ filter->buffer_index
+ );
+
+ filter->buffer_index = 0;
+
+ return -1;
+ }
+ }
+
+ return -1;
+}
+
+/******************************************************************************/
+/** EXPORTED ******************************************************************/
+/******************************************************************************/
+
+int JH_filter_step
+(
+ struct JH_filter filter [const restrict static 1],
+ const int upstream_socket,
+ const int downstream_socket,
+ FILE storage_file [const restrict static 1]
+)
+{
+ switch (filter->state)
+ {
+ case JH_FILTER_IS_SENDING_DOWNSTREAM:
+ JH_DEBUG(stderr, 1, "<SENDING_DOWN> (index: %d)", filter->buffer_index);
+ return
+ send_downstream
+ (
+ filter,
+ upstream_socket,
+ downstream_socket,
+ storage_file
+ );
+
+ case JH_FILTER_IS_SENDING_UPSTREAM:
+ JH_DEBUG(stderr, 1, "<SENDING_UP> (index: %d)", filter->buffer_index);
+ return
+ send_upstream
+ (
+ filter,
+ upstream_socket,
+ downstream_socket
+ );
+
+ default:
+ return -1;
+ }
+}
+
+int JH_filter_initialize
+(
+ struct JH_filter filter [const restrict static 1]
+)
+{
+ filter->state = JH_FILTER_IS_SENDING_DOWNSTREAM;
+ filter->buffer_index = 0;
+
+ return 0;
+}
+
+void JH_filter_finalize
+(
+ struct JH_filter filter [const restrict static 1]
+)
+{
+ /* Nothing to do */
+}
diff --git a/src/filter/filter.h b/src/filter/filter.h
new file mode 100644
index 0000000..ccad593
--- /dev/null
+++ b/src/filter/filter.h
@@ -0,0 +1,27 @@
+#ifndef _JH_FILTER_H_
+#define _JH_FILTER_H_
+
+#include "../parameters/parameters_types.h"
+#include "../server/server_types.h"
+
+#include "filter_types.h"
+
+int JH_filter_initialize
+(
+ struct JH_filter filter [const restrict static 1]
+);
+
+int JH_filter_step
+(
+ struct JH_filter filter [const restrict static 1],
+ const int upstream_socket,
+ const int downstream_socket,
+ FILE storage_file [const restrict static 1]
+);
+
+void JH_filter_finalize
+(
+ struct JH_filter filter [const restrict static 1]
+);
+
+#endif
diff --git a/src/filter/filter_types.h b/src/filter/filter_types.h
new file mode 100644
index 0000000..3b9c28e
--- /dev/null
+++ b/src/filter/filter_types.h
@@ -0,0 +1,20 @@
+#ifndef _JH_FILTER_TYPES_H_
+#define _JH_FILTER_TYPES_H_
+
+#include <stdio.h>
+
+#define JH_FILTER_BUFFER_SIZE 5
+
+enum JH_filter_state
+{
+ JH_FILTER_IS_SENDING_DOWNSTREAM,
+ JH_FILTER_IS_SENDING_UPSTREAM
+};
+
+struct JH_filter
+{
+ enum JH_filter_state state;
+ int buffer_index;
+};
+
+#endif
diff --git a/src/main.c b/src/main.c
new file mode 100644
index 0000000..9216104
--- /dev/null
+++ b/src/main.c
@@ -0,0 +1,54 @@
+#include <sys/socket.h>
+#include <sys/un.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <unistd.h>
+
+#include "error/error.h"
+#include "parameters/parameters.h"
+#include "server/server.h"
+
+#include "pervasive.h"
+
+static void print_help (const char runnable [const restrict static 1])
+{
+ printf
+ (
+ "JabberHive - Limiter\n"
+ "Software Version %d\n"
+ "Protocol Version %d\n"
+ "\nUsages:\n"
+ " JH GATEWAY:\t%s SOCKET_NAME DESTINATION REPLY_RATE\n"
+ " SHOW HELP:\tAnything else.\n"
+ "\nParameters:\n"
+ " SOCKET_NAME:\tValid UNIX socket.\n"
+ " DESTINATION:\tValid UNIX socket.\n"
+ " REPLY_RATE:\tInteger [0,100].\n",
+ JH_PROGRAM_VERSION,
+ JH_PROTOCOL_VERSION,
+ runnable
+ );
+}
+
+
+int main (int const argc, const char * argv [const static argc])
+{
+ struct JH_parameters params;
+
+ if (JH_parameters_initialize(&params, argc, argv) < 0)
+ {
+ print_help(argv[0]);
+
+ return -1;
+ }
+
+ if (JH_server_main(&params) < 0)
+ {
+ return -1;
+ }
+
+ return 0;
+}
diff --git a/src/parameters/CMakeLists.txt b/src/parameters/CMakeLists.txt
new file mode 100644
index 0000000..2aa7ece
--- /dev/null
+++ b/src/parameters/CMakeLists.txt
@@ -0,0 +1,7 @@
+set(
+ SRC_FILES ${SRC_FILES}
+ ${CMAKE_CURRENT_SOURCE_DIR}/parameters.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/parameters_getters.c
+)
+set(SRC_FILES ${SRC_FILES} PARENT_SCOPE)
+
diff --git a/src/parameters/parameters.c b/src/parameters/parameters.c
new file mode 100644
index 0000000..80cb01b
--- /dev/null
+++ b/src/parameters/parameters.c
@@ -0,0 +1,100 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <string.h>
+#include <limits.h>
+
+#include "../core/index.h"
+#include "../error/error.h"
+
+#include "parameters.h"
+
+static void set_default_to_all_fields
+(
+ struct JH_parameters param [const restrict static 1]
+)
+{
+ param->main_storage_filename = JH_PARAMETERS_DEFAULT_MAIN_STORAGE_FILENAME;
+ param->temp_storage_prefix = JH_PARAMETERS_DEFAULT_TEMP_STORAGE_PREFIX;
+ param->temp_storage_prefix_length = strlen(param->temp_storage_prefix);
+ param->socket_name = (const char *) NULL;
+ param->dest_socket_name = (const char *) NULL;
+}
+
+static int is_valid
+(
+ struct JH_parameters param [const restrict static 1]
+)
+{
+ int valid;
+
+ valid = 1;
+
+ if (param->socket_name == (const char *) NULL)
+ {
+ JH_S_FATAL(stderr, "Missing parameter: This entity's socket name.");
+
+ valid = 0;
+ }
+
+ if (param->dest_socket_name == (const char *) NULL)
+ {
+ JH_S_FATAL(stderr, "Missing parameter: The destination's socket name.");
+
+ valid = 0;
+ }
+
+ if
+ (
+ (SIZE_MAX - JH_index_string_length(JH_INDEX_MAX))
+ <= param->temp_storage_prefix_length
+ )
+ {
+ JH_S_FATAL(stderr, "Temporary storage prefix is too long.");
+
+ valid = 0;
+ }
+
+ return valid;
+}
+
+static void set_parameters
+(
+ struct JH_parameters param [const restrict static 1],
+ int const argc,
+ const char * argv [const static argc]
+)
+{
+ if (argc < 2)
+ {
+ return;
+ }
+
+ param->socket_name = argv[1];
+
+ if (argc < 3)
+ {
+ return;
+ }
+
+ param->dest_socket_name = argv[2];
+}
+
+int JH_parameters_initialize
+(
+ struct JH_parameters param [const restrict static 1],
+ int const argc,
+ const char * argv [const static argc]
+)
+{
+ set_default_to_all_fields(param);
+
+ set_parameters(param, argc, argv);
+
+ if (!is_valid(param))
+ {
+ return -1;
+ }
+
+ return 0;
+}
diff --git a/src/parameters/parameters.h b/src/parameters/parameters.h
new file mode 100644
index 0000000..d8382a2
--- /dev/null
+++ b/src/parameters/parameters.h
@@ -0,0 +1,40 @@
+#ifndef _JH_CLI_PARAMETERS_H_
+#define _JH_CLI_PARAMETERS_H_
+
+#include <stdlib.h>
+
+#include "parameters_types.h"
+
+int JH_parameters_initialize
+(
+ struct JH_parameters param [const restrict static 1],
+ int const argc,
+ const char * argv [const static argc]
+);
+
+const char * JH_parameters_get_main_storage_filename
+(
+ const struct JH_parameters param [const restrict static 1]
+);
+
+const char * JH_parameters_get_temp_storage_prefix
+(
+ const struct JH_parameters param [const restrict static 1]
+);
+
+size_t JH_parameters_get_temp_storage_prefix_length
+(
+ const struct JH_parameters param [const restrict static 1]
+);
+
+const char * JH_parameters_get_socket_name
+(
+ const struct JH_parameters param [const restrict static 1]
+);
+
+const char * JH_parameters_get_dest_socket_name
+(
+ const struct JH_parameters param [const restrict static 1]
+);
+
+#endif
diff --git a/src/parameters/parameters_getters.c b/src/parameters/parameters_getters.c
new file mode 100644
index 0000000..75bf1fb
--- /dev/null
+++ b/src/parameters/parameters_getters.c
@@ -0,0 +1,43 @@
+#include <stdlib.h>
+
+#include "parameters.h"
+
+const char * JH_parameters_get_main_storage_filename
+(
+ const struct JH_parameters param [const restrict static 1]
+)
+{
+ return param->main_storage_filename;
+}
+
+const char * JH_parameters_get_temp_storage_prefix
+(
+ const struct JH_parameters param [const restrict static 1]
+)
+{
+ return param->temp_storage_prefix;
+}
+
+size_t JH_parameters_get_temp_storage_prefix_length
+(
+ const struct JH_parameters param [const restrict static 1]
+)
+{
+ return param->temp_storage_prefix_length;
+}
+
+const char * JH_parameters_get_socket_name
+(
+ const struct JH_parameters param [const restrict static 1]
+)
+{
+ return param->socket_name;
+}
+
+const char * JH_parameters_get_dest_socket_name
+(
+ const struct JH_parameters param [const restrict static 1]
+)
+{
+ return param->dest_socket_name;
+}
diff --git a/src/parameters/parameters_types.h b/src/parameters/parameters_types.h
new file mode 100644
index 0000000..c21d14b
--- /dev/null
+++ b/src/parameters/parameters_types.h
@@ -0,0 +1,22 @@
+#ifndef _JH_CLI_PARAMETERS_TYPES_H_
+#define _JH_CLI_PARAMETERS_TYPES_H_
+
+#include <stdlib.h>
+
+#define JH_PARAMETERS_COUNT 3
+
+#define JH_PARAMETERS_DEFAULT_MAIN_STORAGE_FILENAME "storage.txt"
+#define JH_PARAMETERS_DEFAULT_TEMP_STORAGE_PREFIX "/tmp/jabberhive-storage/storage_thread_"
+
+struct JH_parameters
+{
+ const char * restrict main_storage_filename;
+ const char * restrict temp_storage_prefix;
+ size_t temp_storage_prefix_length;
+
+ /* JH **********************************************************************/
+ const char * restrict socket_name;
+ const char * restrict dest_socket_name;
+};
+
+#endif
diff --git a/src/pervasive.h b/src/pervasive.h
new file mode 100644
index 0000000..27d832d
--- /dev/null
+++ b/src/pervasive.h
@@ -0,0 +1,28 @@
+#ifndef _JH_PERVASIVE_H_
+#define _JH_PERVASIVE_H_
+
+#include <string.h>
+
+#define JH_PROGRAM_VERSION 1
+#define JH_PROTOCOL_VERSION 1
+
+#ifdef __FRAMA_C__
+ #define JH_RUNNING_FRAMA_C 1
+#endif
+
+#define JH_DEBUG_ALL 1
+
+#ifndef JH_DEBUG_ALL
+ #define JH_DEBUG_ALL 0
+#endif
+
+#define JH__TO_STRING(x) #x
+#define JH_TO_STRING(x) JH__TO_STRING(x)
+#define JH_ISOLATE(a) do {a} while (0)
+
+/* strncmp stops at '\0' and strlen does not count '\0'. */
+#define JH_IS_PREFIX(a, b) (strncmp(a, b, strlen(a)) == 0)
+
+#define JH_STRING_EQUALS(a, b) (strcmp(a, b) == 0)
+
+#endif
diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt
new file mode 100644
index 0000000..625749b
--- /dev/null
+++ b/src/server/CMakeLists.txt
@@ -0,0 +1,16 @@
+set(
+ SRC_FILES ${SRC_FILES}
+ ${CMAKE_CURRENT_SOURCE_DIR}/server.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/server_create_socket.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/server_finalize.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/server_initialize.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/server_joining_threads.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/server_new_connection.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/server_signal.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/server_wait_for_event.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/server_worker.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/server_worker_data_merger.c
+)
+
+set(SRC_FILES ${SRC_FILES} PARENT_SCOPE)
+
diff --git a/src/server/server.c b/src/server/server.c
new file mode 100644
index 0000000..640584f
--- /dev/null
+++ b/src/server/server.c
@@ -0,0 +1,101 @@
+#include <signal.h>
+#include <string.h>
+#include <stdio.h>
+
+#include "../parameters/parameters.h"
+
+#include "server.h"
+
+int JH_server_main
+(
+ const struct JH_parameters params [const restrict static 1]
+)
+{
+ struct JH_server server;
+ JH_index retries;
+
+ retries = 0;
+ /* TODO
+ if (JH_server_set_signal_handlers < 0)
+ {
+ return -1;
+ }
+ */
+
+ if (JH_server_initialize(&server, params) < 0)
+ {
+ return -1;
+ }
+
+ while (JH_server_is_running())
+ {
+ switch (JH_server_wait_for_event(&server))
+ {
+ case 0: /* Timed out or signal'd. */
+ JH_S_DEBUG(stderr, 1, "Timed out...");
+ JH_server_handle_joining_threads(&server);
+
+ retries = 0;
+
+ break;
+
+ case 1: /* New client attempted connection. */
+ JH_S_DEBUG(stderr, 1, "New connection.");
+ JH_server_handle_joining_threads(&server);
+ (void) JH_server_handle_new_connection(&server);
+
+ retries = 0;
+
+ break;
+
+ case -1: /* Something bad happened. */
+ retries += 1;
+
+ if (retries == JH_SERVER_MAX_RETRIES)
+ {
+ JH_server_finalize(&server);
+
+ return -1;
+ }
+
+ break;
+
+ default:
+ JH_S_PROG_ERROR
+ (
+ stderr,
+ "Unexpected wait_for_event return value."
+ );
+
+ break;
+ }
+ }
+
+ /* Waiting for the threads to join... */
+ while (server.workers.currently_running > 0)
+ {
+ switch (JH_server_wait_for_event(&server))
+ {
+ case 0: /* Timed out. */
+ case 1: /* New client attempted connection. */
+ JH_server_handle_joining_threads(&server);
+ break;
+
+ case -1: /* Something bad happened. */
+ retries += 1;
+
+ if (retries == JH_SERVER_MAX_RETRIES)
+ {
+ JH_server_finalize(&server);
+
+ return -1;
+ }
+ break;
+ }
+ }
+
+ JH_server_finalize(&server);
+
+ return 0;
+}
+
diff --git a/src/server/server.h b/src/server/server.h
new file mode 100644
index 0000000..8f13662
--- /dev/null
+++ b/src/server/server.h
@@ -0,0 +1,98 @@
+#ifndef _JH_SERVER_SERVER_H_
+#define _JH_SERVER_SERVER_H_
+
+#include "../parameters/parameters_types.h"
+
+#include "server_types.h"
+
+int JH_server_initialize
+(
+ struct JH_server server [const restrict static 1],
+ const struct JH_parameters params [const restrict static 1]
+);
+
+int JH_server_socket_open
+(
+ struct JH_server_socket server_socket [const restrict static 1],
+ const char socket_name [const restrict static 1]
+);
+
+void JH_server_request_termination (void);
+int JH_server_is_running (void);
+int JH_server_set_signal_handlers (void);
+
+int JH_server_main
+(
+ const struct JH_parameters params [const restrict static 1]
+);
+
+void JH_server_finalize (struct JH_server [const restrict static 1]);
+
+int JH_server_wait_for_event
+(
+ struct JH_server server [const restrict static 1]
+);
+
+void JH_server_handle_joining_threads
+(
+ struct JH_server server [const restrict static 1]
+);
+
+int JH_server_handle_new_connection
+(
+ struct JH_server server [const restrict static 1]
+);
+
+int JH_server_worker_data_merger_thread_init
+(
+ struct JH_server server [const restrict static 1]
+);
+
+void * JH_server_worker_main (void * input);
+void * JH_server_worker_data_merger_main (void * input);
+
+/* Requires ownership of worker->params.thread_collection->mutex */
+FILE * JH_server_worker_open_storage_file
+(
+ struct JH_server_thread_collection collection [const restrict static 1],
+ const struct JH_parameters params [const restrict static 1],
+ const JH_index thread_id,
+ const char mode [restrict static 1]
+);
+
+int JH_server_worker_receive
+(
+ struct JH_server_worker worker [const restrict static 1]
+);
+
+int JH_server_worker_handle_request
+(
+ struct JH_server_worker worker [const restrict static 1]
+);
+
+int JH_server_worker_send_confirm_pipelining_support
+(
+ struct JH_server_worker worker [const restrict static 1]
+);
+
+int JH_server_worker_send_confirm_protocol_version
+(
+ struct JH_server_worker worker [const restrict static 1]
+);
+
+int JH_server_worker_send_positive
+(
+ struct JH_server_worker worker [const restrict static 1]
+);
+
+int JH_server_worker_send_negative
+(
+ struct JH_server_worker worker [const restrict static 1]
+);
+
+int JH_server_worker_send_generated_reply
+(
+ struct JH_server_worker worker [const restrict static 1]
+);
+
+#endif
diff --git a/src/server/server_create_socket.c b/src/server/server_create_socket.c
new file mode 100644
index 0000000..5e0c00b
--- /dev/null
+++ b/src/server/server_create_socket.c
@@ -0,0 +1,195 @@
+#include <errno.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <sys/socket.h>
+#include <sys/un.h>
+
+#include "../error/error.h"
+
+#include "server.h"
+
+static int create_socket (int result [const restrict static 1])
+{
+ const int old_errno = errno;
+
+ errno = 0;
+ *result = socket(AF_UNIX, SOCK_STREAM, 0);
+
+ if (*result == -1)
+ {
+ JH_FATAL
+ (
+ stderr,
+ "Unable to create server socket: %s.",
+ strerror(errno)
+ );
+
+ errno = old_errno;
+
+ return -1;
+ }
+
+ errno = old_errno;
+
+ return 0;
+}
+
+static int bind_socket
+(
+ const int socket,
+ const char socket_name [const restrict static 1]
+)
+{
+ struct sockaddr_un addr;
+ const int old_errno = errno;
+
+ errno = 0;
+ memset(&addr, 0, sizeof(struct sockaddr_un));
+
+ addr.sun_family = AF_UNIX;
+
+ strncpy
+ (
+ (void *) addr.sun_path,
+ (const void *) socket_name,
+ (sizeof(addr.sun_path) - 1)
+ );
+
+ errno = old_errno;
+
+ if
+ (
+ bind
+ (
+ socket,
+ (const struct sockaddr *) &addr,
+ (socklen_t) sizeof(struct sockaddr_un)
+ ) != 0
+ )
+ {
+ JH_FATAL
+ (
+ stderr,
+ "Unable to bind server socket to %s: %s.",
+ socket_name,
+ strerror(errno)
+ );
+
+ errno = old_errno;
+
+ return -1;
+ }
+
+ errno = old_errno;
+
+ return 0;
+}
+
+static int set_socket_to_unblocking (const int socket)
+{
+ int current_flags;
+ const int old_errno = errno;
+
+ current_flags = fcntl(socket, F_GETFD);
+
+ if (current_flags == -1)
+ {
+ JH_FATAL
+ (
+ stderr,
+ "Unable to get server socket properties: %s.",
+ strerror(errno)
+ );
+
+ errno = old_errno;
+
+ return -1;
+ }
+
+ /* current_flags = current_flags & (~O_NONBLOCK); */
+
+ current_flags = fcntl(socket, F_SETFD, (current_flags | O_NONBLOCK));
+
+ if (current_flags == -1)
+ {
+ JH_FATAL
+ (
+ stderr,
+ "Unable to set server socket properties: %s.",
+ strerror(errno)
+ );
+
+ errno = old_errno;
+
+ return -2;
+ }
+
+ errno = old_errno;
+
+ return 0;
+}
+
+static int set_socket_as_listener (const int socket)
+{
+ const int old_errno = errno;
+
+ if (listen(socket, JH_SERVER_SOCKET_LISTEN_BACKLOG) != 0)
+ {
+ JH_FATAL
+ (
+ stderr,
+ "Unable to set server socket properties: %s.",
+ strerror(errno)
+ );
+
+ errno = old_errno;
+
+ return -1;
+ }
+
+ errno = old_errno;
+
+ return 0;
+}
+
+int JH_server_socket_open
+(
+ struct JH_server_socket server_socket [const restrict static 1],
+ const char socket_name [const restrict static 1]
+)
+{
+ printf("\"%s\"\n", socket_name);
+ if (create_socket(&(server_socket->file_descriptor)) < 0)
+ {
+ return -1;
+ }
+
+ if (bind_socket(server_socket->file_descriptor, socket_name) < 0)
+ {
+ close(server_socket->file_descriptor);
+
+ return -1;
+ }
+
+ if (set_socket_to_unblocking(server_socket->file_descriptor) < 0)
+ {
+ close(server_socket->file_descriptor);
+
+ return -1;
+ }
+
+ if (set_socket_as_listener(server_socket->file_descriptor) < 0)
+ {
+ close(server_socket->file_descriptor);
+
+ return -1;
+ }
+
+ FD_ZERO(&(server_socket->as_a_set));
+ FD_SET(server_socket->file_descriptor, &(server_socket->as_a_set));
+
+ return 0;
+}
diff --git a/src/server/server_finalize.c b/src/server/server_finalize.c
new file mode 100644
index 0000000..81d1021
--- /dev/null
+++ b/src/server/server_finalize.c
@@ -0,0 +1,45 @@
+#include <stdlib.h>
+#include <unistd.h>
+
+#include "../parameters/parameters.h"
+
+#include "server.h"
+
+static void finalize_thread_collection
+(
+ struct JH_server_thread_collection workers [const restrict static 1]
+)
+{
+ free((void *) workers->threads);
+ free((void *) workers->storage_filename);
+
+ workers->threads_capacity = 0;
+
+ pthread_mutex_destroy(&(workers->mutex));
+ pthread_barrier_destroy(&(workers->barrier));
+ pthread_mutex_destroy(&(workers->merger_mutex));
+ pthread_cond_destroy(&(workers->merger_condition));
+
+ workers->currently_running = 0;
+}
+
+static void finalize_socket
+(
+ struct JH_server_socket socket [const restrict static 1]
+)
+{
+ FD_ZERO(&(socket->as_a_set));
+
+ close(socket->file_descriptor);
+
+ socket->file_descriptor = -1;
+}
+
+void JH_server_finalize
+(
+ struct JH_server server [const restrict static 1]
+)
+{
+ finalize_thread_collection(&(server->workers));
+ finalize_socket(&(server->socket));
+}
diff --git a/src/server/server_initialize.c b/src/server/server_initialize.c
new file mode 100644
index 0000000..93a3b02
--- /dev/null
+++ b/src/server/server_initialize.c
@@ -0,0 +1,182 @@
+#include <signal.h>
+#include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+#include "../core/index.h"
+#include "../parameters/parameters.h"
+
+#include "server.h"
+
+static int initialize_worker_collection
+(
+ struct JH_server_thread_collection c [const restrict static 1],
+ const struct JH_parameters params [const restrict static 1]
+)
+{
+ int error;
+
+ c->threads = (struct JH_server_thread_data *) NULL;
+ c->threads_capacity = 0;
+ c->currently_running = 0;
+
+ error =
+ pthread_mutex_init
+ (
+ &(c->mutex),
+ (const pthread_mutexattr_t *) NULL
+ );
+
+ if (error != 0)
+ {
+ JH_FATAL
+ (
+ stderr,
+ "Unable to initialize worker collection's mutex (error: %d): %s.",
+ error,
+ strerror(error)
+ );
+
+ return -1;
+ }
+
+ error =
+ pthread_barrier_init
+ (
+ &(c->barrier),
+ (const pthread_barrierattr_t *) NULL,
+ 2
+ );
+
+ if (error != 0)
+ {
+ pthread_mutex_destroy(&(c->mutex));
+
+ JH_FATAL
+ (
+ stderr,
+ "Unable to initialize worker collection's barrier (error: %d): %s.",
+ error,
+ strerror(error)
+ );
+
+ return -1;
+ }
+
+ error =
+ pthread_mutex_init
+ (
+ &(c->merger_mutex),
+ (const pthread_mutexattr_t *) NULL
+ );
+
+ if (error != 0)
+ {
+ pthread_mutex_destroy(&(c->mutex));
+ pthread_barrier_destroy(&(c->barrier));
+
+ JH_FATAL
+ (
+ stderr,
+ "Unable to initialize worker data merger mutex (error: %d): %s.",
+ error,
+ strerror(error)
+ );
+
+ return -1;
+ }
+
+ error =
+ pthread_cond_init
+ (
+ &(c->merger_condition),
+ (const pthread_condattr_t *) NULL
+ );
+
+ if (error != 0)
+ {
+ pthread_mutex_destroy(&(c->mutex));
+ pthread_barrier_destroy(&(c->barrier));
+ pthread_mutex_destroy(&(c->merger_mutex));
+
+ JH_FATAL
+ (
+ stderr,
+ "Unable to initialize worker data merger condition (error: %d): %s.",
+ error,
+ strerror(error)
+ );
+
+ return -1;
+ }
+
+ c->storage_filename =
+ (char *) calloc
+ (
+ (
+ JH_parameters_get_temp_storage_prefix_length(params)
+ + ((size_t) JH_index_string_length(JH_INDEX_MAX))
+ ),
+ sizeof(char)
+ );
+
+ if (c->storage_filename == (char *) NULL)
+ {
+ pthread_mutex_destroy(&(c->mutex));
+ pthread_barrier_destroy(&(c->barrier));
+ pthread_mutex_destroy(&(c->merger_mutex));
+ pthread_cond_destroy(&(c->merger_condition));
+
+ JH_FATAL
+ (
+ stderr,
+ "Unable to allocate memory to store worker temp storage filename "
+ "(error: %d): %s.",
+ error,
+ strerror(error)
+ );
+
+ return -1;
+ }
+
+ return 0;
+}
+
+void initialize_thread_parameters
+(
+ struct JH_server server [const restrict static 1],
+ const struct JH_parameters params [const restrict static 1]
+)
+{
+ server->thread_params.thread_collection = &(server->workers);
+ server->thread_params.server_params = params;
+ server->thread_params.socket = -1;
+}
+
+int JH_server_initialize
+(
+ struct JH_server server [const restrict static 1],
+ const struct JH_parameters params [const restrict static 1]
+)
+{
+ if (initialize_worker_collection(&(server->workers), params) < 0)
+ {
+ return -1;
+ }
+
+ if
+ (
+ JH_server_socket_open
+ (
+ &(server->socket),
+ JH_parameters_get_socket_name(params)
+ ) < 0
+ )
+ {
+ return -2;
+ }
+
+ initialize_thread_parameters(server, params);
+
+ return JH_server_worker_data_merger_thread_init(server);
+}
diff --git a/src/server/server_joining_threads.c b/src/server/server_joining_threads.c
new file mode 100644
index 0000000..e1d92ca
--- /dev/null
+++ b/src/server/server_joining_threads.c
@@ -0,0 +1,38 @@
+#include <sys/socket.h>
+
+#include <errno.h>
+#include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <unistd.h>
+
+#include "../parameters/parameters.h"
+
+#include "server.h"
+
+void JH_server_handle_joining_threads
+(
+ struct JH_server server [const restrict static 1]
+)
+{
+ JH_index i;
+
+ pthread_mutex_lock(&(server->workers.mutex));
+
+ for (i = 0; i < server->workers.threads_capacity; ++i)
+ {
+ if (server->workers.threads[i].state == JH_SERVER_JOINING_THREAD)
+ {
+ JH_DEBUG(stderr, 1, "Joining thread %u", i);
+
+ pthread_join(server->workers.threads[i].posix_id, (void **) NULL);
+
+ server->workers.threads[i].state = JH_SERVER_NO_THREAD;
+
+ server->workers.currently_running -= 1;
+ }
+ }
+
+ pthread_mutex_unlock(&(server->workers.mutex));
+}
diff --git a/src/server/server_new_connection.c b/src/server/server_new_connection.c
new file mode 100644
index 0000000..0734249
--- /dev/null
+++ b/src/server/server_new_connection.c
@@ -0,0 +1,202 @@
+#include <sys/socket.h>
+
+#include <errno.h>
+#include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <unistd.h>
+
+#include "../parameters/parameters.h"
+
+#include "server.h"
+
+static int get_new_socket (struct JH_server server [const restrict static 1])
+{
+ const int old_errno = errno;
+
+ server->thread_params.socket =
+ accept
+ (
+ server->socket.file_descriptor,
+ (struct sockaddr *) NULL,
+ (socklen_t *) NULL
+ );
+
+ if (server->thread_params.socket == -1)
+ {
+ JH_ERROR
+ (
+ stderr,
+ "Unable to accept on the server's socket: %s.",
+ strerror(errno)
+ );
+
+ errno = old_errno;
+
+ return -1;
+ }
+
+ errno = old_errno;
+
+ return 0;
+}
+
+static int get_new_thread (struct JH_server server [const restrict static 1])
+{
+ struct JH_server_thread_data * new_threads;
+ JH_index i;
+
+ pthread_mutex_lock(&(server->workers.mutex));
+
+ for (i = 0; i < server->workers.threads_capacity; ++i)
+ {
+ if (server->workers.threads[i].state == JH_SERVER_NO_THREAD)
+ {
+ server->thread_params.thread_id = i;
+
+ pthread_mutex_unlock(&(server->workers.mutex));
+
+ return 0;
+ }
+ }
+
+ if
+ (
+ (server->workers.threads_capacity == JH_INDEX_MAX)
+ ||
+ (
+ (size_t) (server->workers.threads_capacity + 1)
+ > (SIZE_MAX / sizeof(struct JH_server_thread_data))
+ )
+ )
+ {
+ JH_S_ERROR
+ (
+ stderr,
+ "Maximum number of concurrent threads attained, unable to add more."
+ );
+
+ pthread_mutex_unlock(&(server->workers.mutex));
+
+ return -1;
+ }
+
+ server->thread_params.thread_id = server->workers.threads_capacity;
+ server->workers.threads_capacity += 1;
+
+ new_threads =
+ (struct JH_server_thread_data *) realloc
+ (
+ server->workers.threads,
+ (
+ sizeof(struct JH_server_thread_data)
+ * ((size_t) server->workers.threads_capacity)
+ )
+ );
+
+ if (new_threads == ((struct JH_server_thread_data *) NULL))
+ {
+ JH_S_ERROR
+ (
+ stderr,
+ "Reallocation of the threads' data list failed."
+ );
+
+ pthread_mutex_unlock(&(server->workers.mutex));
+
+ return -1;
+ }
+
+ server->workers.threads = new_threads;
+
+ pthread_mutex_unlock(&(server->workers.mutex));
+
+ return 0;
+}
+
+static int spawn_thread
+(
+ struct JH_server server [const restrict static 1],
+ void * (*thread_main) (void *)
+)
+{
+ const JH_index thread_id = server->thread_params.thread_id;
+ int error;
+
+ server->workers.threads[thread_id].state = JH_SERVER_RUNNING_THREAD;
+
+ error =
+ pthread_create
+ (
+ &(server->workers.threads[thread_id].posix_id),
+ (const pthread_attr_t *) NULL,
+ thread_main,
+ (void *) &(server->thread_params)
+ );
+
+ if (error != 0)
+ {
+ JH_ERROR
+ (
+ stderr,
+ "Unable to spawn thread: %s.",
+ strerror(error)
+ );
+
+ server->workers.threads[thread_id].state = JH_SERVER_NO_THREAD;
+
+ return -1;
+ }
+
+ pthread_barrier_wait(&(server->workers.barrier));
+
+ server->workers.currently_running += 1;
+
+ return 0;
+}
+
+int JH_server_handle_new_connection
+(
+ struct JH_server server [const restrict static 1]
+)
+{
+ if (get_new_socket(server) < 0)
+ {
+ return -1;
+ }
+
+ if (get_new_thread(server) < 0)
+ {
+ close(server->thread_params.socket);
+
+ return -2;
+ }
+
+ if (spawn_thread(server, JH_server_worker_main) < 0)
+ {
+ close(server->thread_params.socket);
+
+ return -3;
+ }
+
+ return 0;
+}
+
+int JH_server_worker_data_merger_thread_init
+(
+ struct JH_server server [const restrict static 1]
+)
+{
+ if (get_new_thread(server) < 0)
+ {
+ return -1;
+ }
+
+ if (spawn_thread(server, JH_server_worker_data_merger_main) < 0)
+ {
+ return -2;
+ }
+
+ return 0;
+}
diff --git a/src/server/server_signal.c b/src/server/server_signal.c
new file mode 100644
index 0000000..9361382
--- /dev/null
+++ b/src/server/server_signal.c
@@ -0,0 +1,41 @@
+#include <signal.h>
+#include <string.h>
+#include <stdio.h>
+
+#include "server.h"
+
+static volatile char JH_SERVER_IS_RUNNING = (char) 1;
+
+static void request_termination (int const signo)
+{
+ if ((signo == SIGINT) || (signo == SIGTERM))
+ {
+ JH_server_request_termination();
+ }
+}
+
+void JH_server_request_termination (void)
+{
+ JH_SERVER_IS_RUNNING = (char) 0;
+}
+
+int JH_server_is_running (void)
+{
+ return (int) JH_SERVER_IS_RUNNING;
+}
+
+int JH_server_set_signal_handlers (void)
+{
+ /*
+ struct sigaction act;
+
+ act.sa_handler = request_termination;
+ act.sa_mask =
+ act.sa_flags =
+ act.sa_restorer =
+ */
+
+ /* TODO */
+
+ return -1;
+}
diff --git a/src/server/server_types.h b/src/server/server_types.h
new file mode 100644
index 0000000..3159ca5
--- /dev/null
+++ b/src/server/server_types.h
@@ -0,0 +1,87 @@
+#ifndef _JH_SERVER_SERVER_TYPES_H_
+#define _JH_SERVER_SERVER_TYPES_H_
+
+#include <sys/time.h>
+#include <stdio.h>
+
+#ifndef JH_RUNNING_FRAMA_C
+ #include <pthread.h>
+#endif
+
+#include "../core/index_types.h"
+
+#include "../parameters/parameters_types.h"
+
+#include "../error/error.h"
+
+#include "../filter/filter_types.h"
+
+#define JH_SERVER_MAX_RETRIES 10
+#define JH_SERVER_BUFFER_SIZE 0
+
+#define JH_SERVER_SOCKET_ACCEPT_TIMEOUT_SEC 5
+#define JH_SERVER_SOCKET_LISTEN_BACKLOG 5
+
+#define JH_SERVER_WORKER_MAX_WAITING_TIME 5
+
+enum JH_server_thread_state
+{
+ JH_SERVER_JOINING_THREAD,
+ JH_SERVER_TERMINATED_THREAD,
+ JH_SERVER_RUNNING_THREAD,
+ JH_SERVER_NO_THREAD
+};
+
+struct JH_server_thread_data
+{
+#ifndef JH_RUNNING_FRAMA_C
+ pthread_t posix_id;
+#endif
+ enum JH_server_thread_state state;
+ FILE storage_file;
+};
+
+struct JH_server_thread_collection
+{
+ struct JH_server_thread_data * threads;
+ char * storage_filename; /* protected by mutex */
+ JH_index threads_capacity; /* protected by merger_mutex */
+#ifndef JH_RUNNING_FRAMA_C
+ pthread_mutex_t mutex;
+ pthread_barrier_t barrier;
+#endif
+ pthread_mutex_t merger_mutex;
+ pthread_cond_t merger_condition;
+ JH_index currently_running;
+};
+
+struct JH_server_socket
+{
+ int file_descriptor;
+ fd_set as_a_set;
+ struct timeval timeout;
+};
+
+struct JH_server_thread_parameters
+{
+ struct JH_server_thread_collection * thread_collection;
+ const struct JH_parameters * server_params;
+ JH_index thread_id;
+ int socket;
+};
+
+struct JH_server_worker
+{
+ struct JH_server_thread_parameters params;
+ int downstream_socket;
+ FILE * storage_file;
+};
+
+struct JH_server
+{
+ struct JH_server_thread_collection workers;
+ struct JH_server_socket socket;
+ struct JH_server_thread_parameters thread_params;
+};
+
+#endif
diff --git a/src/server/server_wait_for_event.c b/src/server/server_wait_for_event.c
new file mode 100644
index 0000000..c949438
--- /dev/null
+++ b/src/server/server_wait_for_event.c
@@ -0,0 +1,62 @@
+#include <sys/select.h>
+
+#include <errno.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "../error/error.h"
+
+#include "server.h"
+
+int JH_server_wait_for_event
+(
+ struct JH_server server [const static 1]
+)
+{
+ int ready_fds;
+ const int old_errno = errno;
+ fd_set ready_to_read;
+
+ ready_to_read = server->socket.as_a_set;
+
+ /* call to select may alter timeout */
+ memset((void *) &(server->socket.timeout), 0, sizeof(struct timeval));
+
+ server->socket.timeout.tv_sec = JH_SERVER_SOCKET_ACCEPT_TIMEOUT_SEC;
+
+ errno = 0;
+
+ ready_fds = select
+ (
+ (server->socket.file_descriptor + 1),
+ &ready_to_read,
+ (fd_set *) NULL,
+ (fd_set *) NULL,
+ &(server->socket.timeout)
+ );
+
+ JH_DEBUG(stderr, 1, "SELECT returned: %i, errno is %i.", ready_fds, errno);
+
+ if (errno == EINTR)
+ {
+ ready_fds = 0;
+ }
+
+ if (ready_fds == -1)
+ {
+ JH_FATAL
+ (
+ stderr,
+ "Unable to wait on server socket: %s.",
+ strerror(errno)
+ );
+
+ errno = old_errno;
+
+ return -1;
+ }
+
+ errno = old_errno;
+
+ return ready_fds;
+}
diff --git a/src/server/server_worker.c b/src/server/server_worker.c
new file mode 100644
index 0000000..faf8c32
--- /dev/null
+++ b/src/server/server_worker.c
@@ -0,0 +1,274 @@
+#include <sys/socket.h>
+#include <sys/un.h>
+
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <stdio.h>
+#include <errno.h>
+
+#include "../filter/filter.h"
+
+#include "../parameters/parameters.h"
+
+#include "server.h"
+
+static int connect_downstream
+(
+ struct JH_server_worker worker [const restrict static 1]
+)
+{
+ struct sockaddr_un addr;
+
+ const int old_errno = errno;
+
+ errno = 0;
+
+ if ((worker->downstream_socket = socket(AF_UNIX, SOCK_STREAM, 0)) == -1)
+ {
+ JH_FATAL
+ (
+ stderr,
+ "Unable to create socket: %s.",
+ strerror(errno)
+ );
+
+ errno = old_errno;
+
+ return -1;
+ }
+
+ errno = old_errno;
+
+ memset((void *) &addr, (int) 0, sizeof(addr));
+
+ addr.sun_family = AF_UNIX;
+
+ strncpy
+ (
+ (char *) addr.sun_path,
+ JH_parameters_get_dest_socket_name(worker->params.server_params),
+ (sizeof(addr.sun_path) - ((size_t) 1))
+ );
+
+ errno = 0;
+
+ if
+ (
+ connect
+ (
+ worker->downstream_socket,
+ (struct sockaddr *) &addr,
+ sizeof(addr)
+ )
+ == -1
+ )
+ {
+ JH_FATAL
+ (
+ stderr,
+ "Unable to connect to address: %s.",
+ strerror(errno)
+ );
+
+ errno = old_errno;
+
+ close(worker->downstream_socket);
+
+ worker->downstream_socket = -1;
+
+ return -1;
+ }
+
+ errno = old_errno;
+
+ return 0;
+}
+
+static int initialize
+(
+ struct JH_server_worker worker [const restrict static 1],
+ void * input
+)
+{
+ memcpy
+ (
+ (void *) &(worker->params),
+ (const void *) input,
+ sizeof(struct JH_server_thread_parameters)
+ );
+
+ pthread_barrier_wait(&(worker->params.thread_collection->barrier));
+
+ worker->storage_file = (FILE *) NULL;
+ worker->downstream_socket = -1;
+
+ pthread_mutex_lock(&(worker->params.thread_collection->mutex));
+
+ worker->storage_file =
+ JH_server_worker_open_storage_file
+ (
+ worker->params.thread_collection,
+ worker->params.server_params,
+ worker->params.thread_id,
+ "w"
+ );
+
+ pthread_mutex_unlock(&(worker->params.thread_collection->mutex));
+
+ if (worker->storage_file == (FILE *) NULL)
+ {
+ return -1;
+ }
+
+ return connect_downstream(worker);
+}
+
+static void finalize
+(
+ struct JH_server_worker worker [const restrict static 1]
+)
+{
+ if (worker->downstream_socket != -1)
+ {
+ close(worker->downstream_socket);
+
+ worker->downstream_socket = -1;
+ }
+
+ if (worker->params.socket != -1)
+ {
+ close(worker->params.socket);
+
+ worker->params.socket = -1;
+ }
+
+ if (worker->storage_file != (FILE *) NULL)
+ {
+ fclose(worker->storage_file);
+
+ worker->storage_file = (FILE *) NULL;
+ }
+
+ pthread_mutex_lock(&(worker->params.thread_collection->mutex));
+
+ worker->params.thread_collection->threads[worker->params.thread_id].state =
+ JH_SERVER_TERMINATED_THREAD;
+
+ pthread_mutex_unlock(&(worker->params.thread_collection->mutex));
+
+ pthread_mutex_lock(&(worker->params.thread_collection->merger_mutex));
+ pthread_cond_signal(&(worker->params.thread_collection->merger_condition));
+ pthread_mutex_unlock(&(worker->params.thread_collection->merger_mutex));
+}
+
+
+/* Requires ownership of worker->params.thread_collection->mutex */
+FILE * JH_server_worker_open_storage_file
+(
+ struct JH_server_thread_collection collection [const restrict static 1],
+ const struct JH_parameters params [const restrict static 1],
+ const JH_index thread_id,
+ const char mode [restrict static 1]
+)
+{
+ FILE * in;
+ const int old_errno = errno;
+
+ errno = 0;
+
+ /**** (Re)generate the temporary storage file name *************************/
+ sprintf
+ (
+ collection->storage_filename,
+ (
+ "%s"
+ JH_INDEX_TAG
+ ),
+ JH_parameters_get_temp_storage_prefix(params),
+ thread_id
+ );
+
+ /**** Try to open the file *************************************************/
+ in = fopen(collection->storage_filename, mode);
+
+ if (in == (FILE *) NULL)
+ {
+ JH_ERROR
+ (
+ stderr,
+ "Could not open the temporary storage file \"%s\""
+ " (errno: %d): %s.",
+ collection->storage_filename,
+ errno,
+ strerror(errno)
+ );
+
+ errno = old_errno;
+
+ return (FILE *) NULL;
+ }
+
+ errno = old_errno;
+
+ return in;
+}
+
+void * JH_server_worker_main (void * input)
+{
+ int status;
+ int timeout_count;
+ struct JH_filter filter;
+ struct JH_server_worker worker;
+
+ initialize(&worker, input);
+
+ if (JH_filter_initialize(&filter) < 0)
+ {
+ finalize(&worker);
+
+ return NULL;
+ }
+
+ timeout_count = 0;
+
+ while (JH_server_is_running())
+ {
+ status =
+ JH_filter_step
+ (
+ &filter,
+ worker.params.socket,
+ worker.downstream_socket,
+ worker.storage_file
+ );
+
+ if (status == 0)
+ {
+ timeout_count = 0;
+ }
+ else if (status == 1)
+ {
+ timeout_count += 1;
+
+ if (timeout_count == 2)
+ {
+ break;
+ }
+ else
+ {
+ sleep(JH_SERVER_WORKER_MAX_WAITING_TIME);
+ }
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ JH_filter_finalize(&filter);
+
+ finalize(&worker);
+
+ return NULL;
+}
diff --git a/src/server/server_worker_data_merger.c b/src/server/server_worker_data_merger.c
new file mode 100644
index 0000000..79638db
--- /dev/null
+++ b/src/server/server_worker_data_merger.c
@@ -0,0 +1,190 @@
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <stdio.h>
+#include <errno.h>
+
+#include "../core/index.h"
+
+#include "../parameters/parameters.h"
+#include "../error/error.h"
+
+#include "../filter/filter.h"
+
+#include "server.h"
+
+static int initialize
+(
+ struct JH_server_worker worker [const restrict static 1],
+ void * input
+)
+{
+ memcpy
+ (
+ (void *) &(worker->params),
+ (const void *) input,
+ sizeof(struct JH_server_thread_parameters)
+ );
+
+ pthread_barrier_wait(&(worker->params.thread_collection->barrier));
+
+ return 0;
+}
+
+static void finalize
+(
+ struct JH_server_worker worker [const restrict static 1]
+)
+{
+ close(worker->params.socket);
+
+ pthread_mutex_lock(&(worker->params.thread_collection->mutex));
+
+ worker->params.thread_collection->threads[worker->params.thread_id].state =
+ JH_SERVER_JOINING_THREAD;
+
+ pthread_mutex_unlock(&(worker->params.thread_collection->mutex));
+}
+
+static FILE * open_output_file
+(
+ const struct JH_parameters params [const restrict static 1]
+)
+{
+ FILE * out;
+ const int old_errno = errno;
+
+ errno = 0;
+
+ out = fopen(JH_parameters_get_main_storage_filename(params), "a");
+
+ if (out == (FILE *) NULL)
+ {
+ JH_ERROR
+ (
+ stderr,
+ "The worker data merger could not open the main storage file \"%s\""
+ " (errno: %d): %s.",
+ JH_parameters_get_main_storage_filename(params),
+ errno,
+ strerror(errno)
+ );
+
+ errno = old_errno;
+
+ return (FILE *) NULL;
+ }
+
+ errno = old_errno;
+
+ return out;
+}
+
+static void merge_thread_data
+(
+ struct JH_server_thread_collection collection [const restrict static 1],
+ const JH_index thread_id,
+ const struct JH_parameters params [const restrict static 1]
+)
+{
+ char c;
+ FILE * in, * out;
+
+ /** Open input & output files **********************************************/
+ out = open_output_file(params);
+
+ if (out == (FILE *) NULL)
+ {
+ return;
+ }
+
+ in =
+ JH_server_worker_open_storage_file
+ (
+ collection,
+ params,
+ thread_id,
+ "r"
+ );
+
+ if (in == (FILE *) NULL)
+ {
+ fclose(out);
+
+ return;
+ }
+
+ /** Append content of 'in' to 'out' ****************************************/
+ while ((c = (char) fgetc(in)) != EOF)
+ {
+ if (fputc(c, out) == EOF)
+ {
+ break;
+ }
+ }
+
+ if ((ferror(in) != 0) || (ferror(out) != 0))
+ {
+ JH_S_ERROR
+ (
+ stderr,
+ "The Worker Data Merger could not append to the storage file.\n"
+ "Please check for any corruptions that could have been added to the "
+ "end of that file."
+ );
+ }
+
+ /** Close the files & return ***********************************************/
+ fclose(in);
+ fclose(out);
+
+ return;
+}
+
+void * JH_server_worker_data_merger_main (void * input)
+{
+ JH_index i;
+ struct JH_server_worker worker;
+
+ initialize(&worker, input);
+
+ pthread_mutex_lock(&(worker.params.thread_collection->merger_mutex));
+
+ while (JH_server_is_running())
+ {
+ pthread_cond_wait
+ (
+ &(worker.params.thread_collection->merger_condition),
+ &(worker.params.thread_collection->merger_mutex)
+ );
+
+ pthread_mutex_lock(&(worker.params.thread_collection->mutex));
+
+ for (i = 0; i < worker.params.thread_collection->threads_capacity; ++i)
+ {
+
+ if
+ (
+ worker.params.thread_collection->threads[i].state
+ == JH_SERVER_TERMINATED_THREAD
+ )
+ {
+ merge_thread_data
+ (
+ worker.params.thread_collection,
+ i,
+ worker.params.server_params
+ );
+
+ worker.params.thread_collection->threads[i].state =
+ JH_SERVER_JOINING_THREAD;
+ }
+ }
+
+ pthread_mutex_unlock(&(worker.params.thread_collection->mutex));
+ }
+
+ finalize(&worker);
+
+ return NULL;
+}