diff --git a/Pipe/Pipe.vcxproj b/Pipe/Pipe.vcxproj
index df6dfbe..5e7bf28 100644
--- a/Pipe/Pipe.vcxproj
+++ b/Pipe/Pipe.vcxproj
@@ -103,6 +103,7 @@
true
true
true
+ Speed
true
@@ -111,11 +112,12 @@
+
-
+
diff --git a/Pipe/Pipe.vcxproj.filters b/Pipe/Pipe.vcxproj.filters
index e85778a..9c7a987 100644
--- a/Pipe/Pipe.vcxproj.filters
+++ b/Pipe/Pipe.vcxproj.filters
@@ -21,16 +21,19 @@
Source Files
+
+ Source Files
+
-
- Header Files
-
Header Files
Header Files
+
+ Header Files
+
\ No newline at end of file
diff --git a/Pipe/hub.h b/Pipe/hub.h
new file mode 100644
index 0000000..c54642a
--- /dev/null
+++ b/Pipe/hub.h
@@ -0,0 +1,145 @@
+#ifndef HUB_H_
+#define HUB_H_
+
+#include
+#include "ringbuffer.h"
+#include "connector.h"
+
+/* microsoft specific */
+#ifdef _MSC_VER
+#define inline __inline
+#endif
+
+typedef struct hub_tt
+{
+ connector_t * input_connector;
+ connector_t * output_connector;
+ void * state;
+} hub_t;
+
+
+/*******************************************/
+/* Functions to work with the hub system. */
+/*******************************************/
+
+/*
+Inserts a element into a hub.
+So the signal can inserted to the hub system.
+*/
+static inline void Hub_Insert(hub_t * const hub, uint32_t element)
+{
+ Connector_Insert(hub->input_connector, 0, element);
+}
+
+/*
+Read an element from the hub.
+Used inside functions of the hub system.
+*/
+static inline uint32_t Hub_Read(hub_t * hub)
+{
+ return Connector_Read(hub->input_connector);
+}
+
+/*
+Write an element to connected hubs.
+Used inside functions of the hub system.
+*/
+static inline void Hub_Write(hub_t * hub, uint32_t element)
+{
+ Connector_Write(hub->output_connector, element);
+}
+
+/*
+Check if the hub contents elements.
+Used inside functions of the hub system.
+*/
+static inline uint8_t Hub_IsFilled(hub_t * hub)
+{
+ return Connector_IsFilled(hub->input_connector);
+}
+
+static inline uint8_t Hub_IsEmpty(hub_t * hub)
+{
+ return Connector_IsEmpty(hub->input_connector);
+}
+
+/*
+Check if the hub has no place left for new elements.
+*/
+static inline uint8_t Hub_IsFull(const hub_t * hub)
+{
+ return Connector_IsFull(hub->output_connector);
+}
+
+
+/***************************************/
+/* Functions to construct hub system. */
+/***************************************/
+
+#define Concat2(a, b) a ## b
+#define Concat(a, b) Concat2(a, b)
+
+/*
+Macro for the creation of a hub.
+Automates the creation of a ring buffer and the hub.
+arg_name is the variable name and string name of the hub.
+arg_size is the ring buffer size in bytes.
+arg_conn_count is the number of outgoing connections.
+arg_state is the given state, which can be used in the function.
+arg_log is the log function, called when an element is sent.
+*/
+#define Hub_Create(name, input_connection_count, output_connection_count, state) \
+static Connector_Create(Concat(name, Concat(_input_connector, __LINE__)), input_connection_count); \
+static Connector_Create(Concat(name, Concat(_output_connector, __LINE__)), output_connection_count); \
+hub_t name; \
+Hub_Init(&name, &Concat(name, Concat(_input_connector, __LINE__)), &Concat(name, Concat(_output_connector, __LINE__)), state)
+
+/*
+Initializes a hub.
+A Ringbuffer is needed to store elements from other hubs.
+A State (NULL if function has no state) for the function using the hub.
+*/
+static inline void Hub_Init(hub_t * const hub, connector_t * const input_connector, connector_t * const output_connector, void * const state)
+{
+ hub->input_connector = input_connector;
+ hub->output_connector = output_connector;
+ hub->state = state;
+}
+
+/* Connect two hubs. hub a sends elements to hub b. */
+static inline void Hub_Connect(hub_t * const source, hub_t * const target, ringbuffer_t * ring_buffer)
+{
+ Connector_Add(source->output_connector, ring_buffer);
+ Connector_Add(target->input_connector, ring_buffer);
+}
+
+#define Hub_CreateConnection(source, target, size) \
+RingBuffer_Create(Concat(connector_ring_buffer, __LINE__), size); \
+Hub_Connect(&source, &target, &Concat(connector_ring_buffer, __LINE__))
+
+
+/*
+If you want to connect the hub with a ring buffer, use Hub_ConnectInputWithRingBuffer
+or Hub_ConnectOutputWithRingBuffer.
+*/
+
+static inline void Hub_ConnectInputWithRingBuffer(hub_t * const hub, ringbuffer_t * ring_buffer)
+{
+ Connector_Add(hub->input_connector, ring_buffer);
+}
+
+static inline void Hub_ConnectOutputWithRingBuffer(hub_t * const hub, ringbuffer_t * ring_buffer)
+{
+ Connector_Add(hub->output_connector, ring_buffer);
+}
+
+static inline void Hub_AddInputBuffer(hub_t * const hub, ringbuffer_t * input_buffer)
+{
+ Connector_Add(hub->input_connector, input_buffer);
+}
+
+#define Hub_CreateInputBuffer(hub_name, size) \
+RingBuffer_Create(Concat(connector_ring_buffer, __LINE__), size); \
+Hub_AddInputBuffer(&hub_name, &Concat(connector_ring_buffer, __LINE__))
+
+#endif
diff --git a/Pipe/main.c b/Pipe/main.c
index c5c0784..b35fd90 100644
--- a/Pipe/main.c
+++ b/Pipe/main.c
@@ -5,88 +5,88 @@
#include "ringbuffer.h"
#include "connector.h"
-#include "pipe.h"
+#include "hub.h"
#ifdef _MSC_VER
#define inline __inline
#endif
/* Integrate every element of the signal. */
-void increment(pipe_t * const p)
+void increment(hub_t * const p)
{
- while (Pipe_IsFilled(p))
+ while (Hub_IsFilled(p))
{
- uint32_t item = Pipe_Read(p);
+ uint32_t item = Hub_Read(p);
item++;
- Pipe_Write(p, item);
+ Hub_Write(p, item);
}
}
/* Square every element of the signal. */
-void square(pipe_t * const p)
+void square(hub_t * const p)
{
- while (Pipe_IsFilled(p))
+ while (Hub_IsFilled(p))
{
- uint32_t item = Pipe_Read(p);
+ uint32_t item = Hub_Read(p);
item = item * item;
- Pipe_Write(p, item);
+ Hub_Write(p, item);
}
}
/* Integrate over every element of the signal. */
-void integrate(pipe_t * const pipe)
+void integrate(hub_t * const hub)
{
- uint32_t state = *((uint32_t*)pipe->state);
+ uint32_t state = *((uint32_t*)hub->state);
- while (Pipe_IsFilled(pipe))
+ while (Hub_IsFilled(hub))
{
- uint32_t item = Pipe_Read(pipe);
+ uint32_t item = Hub_Read(hub);
state = state + item;
- Pipe_Write(pipe, state);
+ Hub_Write(hub, state);
}
- *((uint32_t*)pipe->state) = state;
+ *((uint32_t*)hub->state) = state;
}
/* Build the sum of all elements of the signal. */
-void sum(pipe_t * const pipe)
+void sum(hub_t * const hub)
{
uint32_t sum = 0;
- while (Pipe_IsFilled(pipe))
- sum += Pipe_Read(pipe);
- Pipe_Write(pipe, sum);
+ while (Hub_IsFilled(hub))
+ sum += Hub_Read(hub);
+ Hub_Write(hub, sum);
}
/* Build the average of all elements of the signal. */
-void average(pipe_t * const pipe)
+void average(hub_t * const hub)
{
uint32_t sum = 0;
uint32_t element_counter = 0;
uint32_t average = 0;
- while (Pipe_IsFilled(pipe))
+ while (Hub_IsFilled(hub))
{
- sum += Pipe_Read(pipe);
+ sum += Hub_Read(hub);
element_counter++;
}
average = sum / element_counter;
- Pipe_Write(pipe, average);
+ Hub_Write(hub, average);
}
/* Print the signal. */
-void print(pipe_t * const pipe)
+void print(hub_t * const hub)
{
printf("\nOutput:\n");
- while (Pipe_IsFilled(pipe))
- printf("%d\n", Pipe_Read(pipe));
+ while (Hub_IsFilled(hub))
+ printf("%d\n", Hub_Read(hub));
}
/* Logging function. Set by user. */
-void log(pipe_t * const source, pipe_t * const target, uint32_t element)
-{
- //if (Pipe_IsFull(target))
- // printf("Error: Pipe %s is full!\n", target->name);
+//void log(hub_t * const source, hub_t * const target, uint32_t element)
+//{
+ //if (Hub_IsFull(target))
+ // printf("Error: Hub %s is full!\n", target->name);
//if (source->state == NULL && target->state == NULL)
// printf("%s -> %d -> %s\n", source->name, element, target->name);
@@ -97,10 +97,11 @@ void log(pipe_t * const source, pipe_t * const target, uint32_t element)
//else
// printf("%s -> %d -> %s(%d)\n", source->name, element, target->name, *((uint32_t*)target->state));
- printf("%s: %d\n", source->name, element);
-}
+ //printf("%s: %d\n", source->name, element);
+//}
extern void threads(uint32_t loops);
+extern void speed(uint32_t items_count);
int main(int argc, char *argv[])
{
@@ -108,47 +109,51 @@ int main(int argc, char *argv[])
if (argc == 1)
{
- /* Create pipes and connect them */
- Pipe_Create(increment_pipe, 1, 1, NULL, log);
- Pipe_Create(square_pipe, 1, 1, NULL, log);
- Pipe_Create(integrate_pipe, 2, 2, &counter, log);
- Pipe_Create(sum_pipe, 1, 1, NULL, log);
- Pipe_Create(average_pipe, 1, 1, NULL, log);
- Pipe_Create(print_pipe, 2, 1, NULL, log);
+ /* Create hubs and connect them */
+ Hub_Create(increment_hub, 1, 1, NULL);
+ Hub_Create(square_hub, 1, 1, NULL);
+ Hub_Create(integrate_hub, 2, 2, &counter);
+ Hub_Create(sum_hub, 1, 1, NULL);
+ Hub_Create(average_hub, 1, 1, NULL);
+ Hub_Create(print_hub, 2, 1, NULL);
- Pipe_CreateInputBuffer(increment_pipe, 4);
- Pipe_CreateInputBuffer(square_pipe, 4);
+ Hub_CreateInputBuffer(increment_hub, 4);
+ Hub_CreateInputBuffer(square_hub, 4);
- Pipe_CreateConnection(increment_pipe, integrate_pipe, 4);
- Pipe_CreateConnection(square_pipe, integrate_pipe, 4);
- Pipe_CreateConnection(integrate_pipe, sum_pipe, 8);
- Pipe_CreateConnection(integrate_pipe, average_pipe, 8);
- Pipe_CreateConnection(sum_pipe, print_pipe, 4);
- Pipe_CreateConnection(average_pipe, print_pipe, 4);
+ Hub_CreateConnection(increment_hub, integrate_hub, 4);
+ Hub_CreateConnection(square_hub, integrate_hub, 4);
+ Hub_CreateConnection(integrate_hub, sum_hub, 8);
+ Hub_CreateConnection(integrate_hub, average_hub, 8);
+ Hub_CreateConnection(sum_hub, print_hub, 4);
+ Hub_CreateConnection(average_hub, print_hub, 4);
/* Create Input */
- Pipe_Insert(&increment_pipe, 1);
- Pipe_Insert(&increment_pipe, 3);
- Pipe_Insert(&increment_pipe, 5);
+ Hub_Insert(&increment_hub, 1);
+ Hub_Insert(&increment_hub, 3);
+ Hub_Insert(&increment_hub, 5);
- Pipe_Insert(&square_pipe, 2);
- Pipe_Insert(&square_pipe, 4);
- Pipe_Insert(&square_pipe, 6);
+ Hub_Insert(&square_hub, 2);
+ Hub_Insert(&square_hub, 4);
+ Hub_Insert(&square_hub, 6);
/* run the functions (each can run in an own thread) */
- increment(&increment_pipe);
- square(&square_pipe);
- integrate(&integrate_pipe);
- sum(&sum_pipe);
- average(&average_pipe);
- print(&print_pipe);
+ increment(&increment_hub);
+ square(&square_hub);
+ integrate(&integrate_hub);
+ sum(&sum_hub);
+ average(&average_hub);
+ print(&print_hub);
getchar();
}
- else
+ else if (argc == 2)
{
threads(atoi(argv[1]));
}
+ else
+ {
+ speed(atoi(argv[2]));
+ }
return 0;
}
diff --git a/Pipe/pipe.h b/Pipe/pipe.h
deleted file mode 100644
index fba80e8..0000000
--- a/Pipe/pipe.h
+++ /dev/null
@@ -1,161 +0,0 @@
-#ifndef PIPE_H_
-#define PIPE_H_
-
-#include
-#include "ringbuffer.h"
-#include "connector.h"
-
-/* microsoft specific */
-#ifdef _MSC_VER
-#define inline __inline
-#endif
-
-typedef struct pipe_tt
-{
- connector_t * input_connector;
- connector_t * output_connector;
- void * state;
- char * name;
- void(*log_function)(struct pipe_tt * from, struct pipe_tt * to, uint32_t elem);
-} pipe_t;
-
-
-/*******************************************/
-/* Functions to work with the pipe system. */
-/*******************************************/
-
-/*
-Inserts a element into a pipe.
-So the signal can inserted to the pipe system.
-*/
-static inline void Pipe_Insert(pipe_t * const pipe, uint32_t element)
-{
- Connector_Insert(pipe->input_connector, 0, element);
-}
-
-/*
-Read an element from the pipe.
-Used inside functions of the pipe system.
-*/
-static inline uint32_t Pipe_Read(pipe_t * pipe)
-{
- return Connector_Read(pipe->input_connector);
-}
-
-/*
-Write an element to connected pipes.
-Used inside functions of the pipe system.
-*/
-static inline void Pipe_Write(pipe_t * pipe, uint32_t element)
-{
- pipe->log_function(pipe, NULL, element);
- Connector_Write(pipe->output_connector, element);
-}
-
-/*
-Check if the pipe contents elements.
-Used inside functions of the pipe system.
-*/
-static inline uint8_t Pipe_IsFilled(pipe_t * pipe)
-{
- return Connector_IsFilled(pipe->input_connector);
-}
-
-/*
-Check if the pipe has no place left for new elements.
-Usefull for the logging.
-*/
-static inline uint8_t Pipe_IsFull(const pipe_t * pipe)
-{
- return Connector_IsFull(pipe->input_connector);
-}
-
-/*
-Default Logging function.
-Shall be set by user.
-*/
-static inline void Pipe_Log(pipe_t * const source, pipe_t * const target, uint32_t element)
-{
- if (Pipe_IsFull(target))
- printf("Error: Pipe %s is full!\n", target->name);
-
- if (source->state == NULL && target->state == NULL)
- printf("%s -> %d -> %s\n", source->name, element, target->name);
- else if (source->state != NULL && target->state != NULL)
- printf("%s(%d) -> %d -> %s(%d)\n", source->name, *((uint32_t*)source->state), element, target->name, *((uint32_t*)target->state));
- else if (source->state != NULL)
- printf("%s(%d) -> %d -> %s\n", source->name, *((uint32_t*)source->state), element, target->name);
- else
- printf("%s -> %d -> %s(%d)\n", source->name, element, target->name, *((uint32_t*)target->state));
-}
-
-
-/***************************************/
-/* Functions to construct pipe system. */
-/***************************************/
-
-#define Concat2(a, b) a ## b
-#define Concat(a, b) Concat2(a, b)
-
-/*
-Macro for the creation of a pipe.
-Automates the creation of a ring buffer and the pipe.
-arg_name is the variable name and string name of the pipe.
-arg_size is the ring buffer size in bytes.
-arg_conn_count is the number of outgoing connections.
-arg_state is the given state, which can be used in the function.
-arg_log is the log function, called when an element is sent.
-*/
-#define Pipe_Create(name, input_connection_count, output_connection_count, state, log) \
-static Connector_Create(Concat(name, Concat(_input_connector, __LINE__)), input_connection_count); \
-static Connector_Create(Concat(name, Concat(_output_connector, __LINE__)), output_connection_count); \
-pipe_t name; \
-Pipe_Init(&name, &Concat(name, Concat(_input_connector, __LINE__)), &Concat(name, Concat(_output_connector, __LINE__)), state, #name, log)
-
-/*
-Initializes a pipe.
-A Ringbuffer is needed to store elements from other pipes.
-A State (NULL if function has no state) for the function using the pipe.
-A Name and a logging function are usefull to track the dataflow.
-*/
-static inline void Pipe_Init(
- pipe_t * const pipe,
- connector_t * input_connector,
- connector_t * output_connector,
- void * state,
- char * const name,
- void(*log_function)(struct pipe_tt * source, struct pipe_tt * target, uint32_t element)
- )
-{
- pipe->input_connector = input_connector;
- pipe->output_connector = output_connector;
- pipe->state = state;
- pipe->name = name;
-
- if (log_function == NULL)
- pipe->log_function = Pipe_Log;
- else
- pipe->log_function = log_function;
-}
-
-/* Connect two pipes. Pipe a sends elements to pipe b. */
-static inline void Pipe_Connect(pipe_t * const source, pipe_t * const target, ringbuffer_t * ring_buffer)
-{
- Connector_Add(source->output_connector, ring_buffer);
- Connector_Add(target->input_connector, ring_buffer);
-}
-
-#define Pipe_CreateConnection(source, target, size) \
-RingBuffer_Create(Concat(connector_ring_buffer, __LINE__), size); \
-Pipe_Connect(&source, &target, &Concat(connector_ring_buffer, __LINE__))
-
-static inline void Pipe_AddInputBuffer(pipe_t * const pipe, ringbuffer_t * input_buffer)
-{
- Connector_Add(pipe->input_connector, input_buffer);
-}
-
-#define Pipe_CreateInputBuffer(pipe_name, size) \
-RingBuffer_Create(Concat(connector_ring_buffer, __LINE__), size); \
-Pipe_AddInputBuffer(&pipe_name, &Concat(connector_ring_buffer, __LINE__))
-
-#endif
diff --git a/Pipe/ringbuffer.h b/Pipe/ringbuffer.h
index faeefdf..1a71519 100644
--- a/Pipe/ringbuffer.h
+++ b/Pipe/ringbuffer.h
@@ -7,6 +7,12 @@
#define inline __inline
#endif
+/*
+A thread-safe ring buffer, which uses an pre-declared array as buffer.
+But you have to declare a buffer with one more item than you really need.
+For instance, if you need a ring buffer for 10 items, the array must be declared for 11 items.
+*/
+
typedef struct {
uint32_t * reader;
uint32_t * writer;
@@ -19,9 +25,6 @@ static inline void RingBuffer_Init(ringbuffer_t * const ring_buffer, uint32_t *
{
ring_buffer->start = ring_buffer->reader = ring_buffer->writer = &array[0];
ring_buffer->end = &array[0] + size - 1;
-
- for (uint32_t i = 0; i < size; i++)
- array[i] = 0;
}
static inline uint32_t * RingBuffer_NextAddress(ringbuffer_t * const ring_buffer, uint32_t * const pointer)
@@ -51,7 +54,6 @@ static inline uint8_t RingBuffer_IsFilled(ringbuffer_t * const ring_buffer)
return !RingBuffer_IsEmpty(ring_buffer);
}
-/* Write element into RingBuffer. */
static inline void RingBuffer_Write(ringbuffer_t * const ring_buffer, uint32_t element)
{
if (!RingBuffer_IsFull(ring_buffer))
@@ -61,7 +63,6 @@ static inline void RingBuffer_Write(ringbuffer_t * const ring_buffer, uint32_t e
}
}
-/* Read value from RingBuffer. */
static inline uint32_t RingBuffer_Read(ringbuffer_t * const ring_buffer)
{
uint32_t element = 0;
@@ -69,19 +70,32 @@ static inline uint32_t RingBuffer_Read(ringbuffer_t * const ring_buffer)
if (!RingBuffer_IsEmpty(ring_buffer))
{
element = *(ring_buffer->reader);
- *(ring_buffer->reader) = 30;
ring_buffer->reader = RingBuffer_NextAddress(ring_buffer, ring_buffer->reader);
}
return element;
}
+
+#define Concat2(a, b) a ## b
+#define Concat(a, b) Concat2(a, b)
+#define UniqueBuffer(name) Concat(name, Concat(_buffer, __LINE__))
+
/*
Macro for the creation of a ring buffer.
*/
-#define RingBuffer_Create(name, size) \
-uint32_t Concat(name, Concat(_buffer, __LINE__))[size]; \
-ringbuffer_t name; \
-RingBuffer_Init(&name, Concat(name, Concat(_buffer, __LINE__)), size) \
+#define RingBuffer_Create(name, size) \
+uint32_t UniqueBuffer(name)[size]; \
+ringbuffer_t name; \
+RingBuffer_Init(&name, UniqueBuffer(name), size) \
+
+#define RingBuffer(name, size) \
+uint32_t UniqueBuffer(name)[size]; \
+ringbuffer_t name = { \
+ .reader = &UniqueBuffer(name)[0], \
+ .writer = &UniqueBuffer(name)[0], \
+ .start = &UniqueBuffer(name)[0], \
+ .end = &UniqueBuffer(name)[0] + sizeof(UniqueBuffer(name)) / sizeof(UniqueBuffer(name)[0]) \
+}
#endif
diff --git a/Pipe/speed.c b/Pipe/speed.c
new file mode 100644
index 0000000..1d35a3c
--- /dev/null
+++ b/Pipe/speed.c
@@ -0,0 +1,230 @@
+#include
+#include
+#include
+#include
+
+#include "ringbuffer.h"
+#include "hub.h"
+
+uint32_t counter;
+
+DWORD WINAPI Speed_Thread1(LPVOID lpParam)
+{
+ hub_t * hub = (hub_t *)lpParam;
+
+ /* Generate input */
+ for (uint32_t i = 0; i < counter; i++)
+ {
+ while(Hub_IsFull(hub)) {}
+ Hub_Write(hub, i);
+ }
+ while (Hub_IsFull(hub)) {}
+ Hub_Write(hub, UINT32_MAX);
+
+ return 0;
+}
+
+DWORD WINAPI Speed_Thread2(LPVOID lpParam)
+{
+ hub_t * hub = (hub_t *)lpParam;
+ uint32_t item = 0;
+
+ while (item != UINT32_MAX)
+ {
+ if (Hub_IsFilled(hub))
+ {
+ item = Hub_Read(hub);
+ //printf("%d\n", item);
+ while (Hub_IsFull(hub)) {}
+ Hub_Write(hub, item);
+ }
+ }
+
+ return 0;
+}
+
+DWORD WINAPI Speed_Thread3(LPVOID lpParam)
+{
+ hub_t * hub = (hub_t *)lpParam;
+ uint32_t item = 0;
+
+ while (item != UINT32_MAX)
+ {
+ if (Hub_IsFilled(hub))
+ {
+ item = Hub_Read(hub);
+ while (Hub_IsFull(hub)) {}
+ Hub_Write(hub, item);
+ }
+ }
+
+ return 0;
+}
+
+DWORD WINAPI Speed_Thread4(LPVOID lpParam)
+{
+ hub_t * hub = (hub_t *)lpParam;
+ uint32_t item = 0;
+
+ while (item != UINT32_MAX)
+ {
+ if (Hub_IsFilled(hub))
+ {
+ item = Hub_Read(hub);
+ while (Hub_IsFull(hub)) {}
+ Hub_Write(hub, item);
+ }
+ }
+
+ return 0;
+}
+
+DWORD WINAPI Speed_Thread5(LPVOID lpParam)
+{
+ hub_t * hub = (hub_t *)lpParam;
+ uint32_t item = 0;
+
+ while (item != UINT32_MAX)
+ {
+ if (Hub_IsFilled(hub))
+ {
+ item = Hub_Read(hub);
+ while (Hub_IsFull(hub)) {}
+ Hub_Write(hub, item);
+ }
+ }
+
+ return 0;
+}
+
+DWORD WINAPI Speed_Thread6(LPVOID lpParam)
+{
+ hub_t * hub = (hub_t *)lpParam;
+ uint32_t item = 0;
+
+ while (item != UINT32_MAX)
+ {
+ if (Hub_IsFilled(hub))
+ {
+ item = Hub_Read(hub);
+ while (Hub_IsFull(hub)) {}
+ Hub_Write(hub, item);
+ }
+ }
+
+ return 0;
+}
+
+DWORD WINAPI Speed_Thread7(LPVOID lpParam)
+{
+ hub_t * hub = (hub_t *)lpParam;
+ uint32_t item = 0;
+
+ while (item != UINT32_MAX)
+ {
+ if (Hub_IsFilled(hub))
+ {
+ item = Hub_Read(hub);
+ while (Hub_IsFull(hub)) {}
+ Hub_Write(hub, item);
+ }
+ }
+
+ return 0;
+}
+
+DWORD WINAPI Speed_Thread8(LPVOID lpParam)
+{
+ hub_t * hub = (hub_t *)lpParam;
+ uint32_t item = 0;
+
+ while (item != UINT32_MAX)
+ {
+ if (Hub_IsFilled(hub))
+ {
+ item = Hub_Read(hub);
+ }
+ }
+
+ return 0;
+}
+
+void speed(uint32_t items_count)
+{
+ counter = items_count;
+
+ /* Create hubs and connect them */
+ Hub_Create(hub1, 1, 1, NULL);
+ Hub_Create(hub2, 1, 1, NULL);
+ Hub_Create(hub3, 1, 1, NULL);
+ Hub_Create(hub4, 1, 1, NULL);
+ Hub_Create(hub5, 1, 1, NULL);
+ Hub_Create(hub6, 1, 1, NULL);
+ Hub_Create(hub7, 1, 1, NULL);
+ Hub_Create(hub8, 1, 1, NULL);
+
+#define RINGBUFFER_SIZE 1024
+ Hub_CreateConnection(hub1, hub2, RINGBUFFER_SIZE);
+ Hub_CreateConnection(hub2, hub3, RINGBUFFER_SIZE);
+ Hub_CreateConnection(hub3, hub4, RINGBUFFER_SIZE);
+ Hub_CreateConnection(hub4, hub5, RINGBUFFER_SIZE);
+ Hub_CreateConnection(hub5, hub6, RINGBUFFER_SIZE);
+ Hub_CreateConnection(hub6, hub7, RINGBUFFER_SIZE);
+ Hub_CreateConnection(hub7, hub8, RINGBUFFER_SIZE);
+
+ SYSTEMTIME local_time;
+ GetLocalTime(&local_time);
+
+ printf("\n");
+ printf("Starting Speed Test\n");
+ printf("at %02d/%02d/%02d %02d:%02d:%02d\n", local_time.wYear, local_time.wMonth, local_time.wDay, local_time.wHour, local_time.wMinute, local_time.wSecond);
+ printf("\n");
+
+ LARGE_INTEGER Frequency;
+ LARGE_INTEGER StartingTime, EndingTime, ElapsedMicroseconds;
+ QueryPerformanceFrequency(&Frequency);
+ QueryPerformanceCounter(&StartingTime);
+
+
+ /* Create Threads */
+ HANDLE threadHandles[8];
+
+ threadHandles[0] = CreateThread(NULL, 0, Speed_Thread1, &hub1, 0, NULL);
+ threadHandles[1] = CreateThread(NULL, 0, Speed_Thread2, &hub2, 0, NULL);
+ threadHandles[2] = CreateThread(NULL, 0, Speed_Thread3, &hub3, 0, NULL);
+ threadHandles[3] = CreateThread(NULL, 0, Speed_Thread4, &hub4, 0, NULL);
+ threadHandles[4] = CreateThread(NULL, 0, Speed_Thread5, &hub5, 0, NULL);
+ threadHandles[5] = CreateThread(NULL, 0, Speed_Thread6, &hub6, 0, NULL);
+ threadHandles[6] = CreateThread(NULL, 0, Speed_Thread7, &hub7, 0, NULL);
+ threadHandles[7] = CreateThread(NULL, 0, Speed_Thread8, &hub8, 0, NULL);
+
+ WaitForMultipleObjects(8, threadHandles, TRUE, INFINITE);
+
+ CloseHandle(threadHandles[0]);
+ CloseHandle(threadHandles[1]);
+ CloseHandle(threadHandles[2]);
+ CloseHandle(threadHandles[3]);
+ CloseHandle(threadHandles[4]);
+ CloseHandle(threadHandles[5]);
+ CloseHandle(threadHandles[6]);
+ CloseHandle(threadHandles[7]);
+
+ QueryPerformanceCounter(&EndingTime);
+ ElapsedMicroseconds.QuadPart = EndingTime.QuadPart - StartingTime.QuadPart;
+ ElapsedMicroseconds.QuadPart *= 1000000;
+ ElapsedMicroseconds.QuadPart /= Frequency.QuadPart;
+ int seconds = (int)ElapsedMicroseconds.QuadPart / 1000000;
+
+ int hours = seconds / 3600;
+ int minutes = (seconds - (hours * 3600)) / 60;
+ seconds = seconds - (minutes * 60);
+
+ printf("Count of Concurrent Data Operations: %d\n", items_count * 14);
+
+ printf("\n");
+ GetLocalTime(&local_time);
+ printf("Test finished\n");
+ printf("at %02d/%02d/%02d %02d:%02d:%02d\n", local_time.wYear, local_time.wMonth, local_time.wDay, local_time.wHour, local_time.wMinute, local_time.wSecond);
+ printf("Elapsed time: %02d:%02d:%02d", hours, minutes, seconds);
+ printf("\n");
+}
\ No newline at end of file
diff --git a/Pipe/threads.c b/Pipe/threads.c
index 620528c..83ffc84 100644
--- a/Pipe/threads.c
+++ b/Pipe/threads.c
@@ -4,64 +4,62 @@
#include
#include "ringbuffer.h"
-#include "pipe.h"
+#include "hub.h"
#define NUMBER_OF_ELEMENTS 1000
/* Integrate every element of the signal. */
-static void increment(pipe_t * const pipe)
+static void increment(hub_t * const hub)
{
- uint8_t check = 1;
- while (Pipe_IsFilled(pipe))
+ while (Hub_IsFilled(hub))
{
- uint32_t item = Pipe_Read(pipe);
+ uint32_t item = Hub_Read(hub);
// do something
- Pipe_Write(pipe, item);
+ Hub_Write(hub, item);
}
}
DWORD WINAPI Thread1(LPVOID lpParam)
{
- pipe_t * pipe = (pipe_t *)lpParam;
+ hub_t * hub = (hub_t *)lpParam;
/* Generate input */
for (int i = 0; i < NUMBER_OF_ELEMENTS; i++)
- Pipe_Insert(pipe, 42);
- increment(pipe);
+ Hub_Insert(hub, 42);
+ increment(hub);
- Pipe_Write(pipe, UINT32_MAX);
+ Hub_Write(hub, UINT32_MAX);
return 0;
}
DWORD WINAPI Thread2(LPVOID lpParam)
{
- pipe_t * pipe = (pipe_t *)lpParam;
+ hub_t * hub = (hub_t *)lpParam;
/* Generate input */
for (int i = 0; i < NUMBER_OF_ELEMENTS; i++)
- Pipe_Insert(pipe, 23);
- increment(pipe);
+ Hub_Insert(hub, 23);
+ increment(hub);
- Pipe_Write(pipe, UINT32_MAX);
+ Hub_Write(hub, UINT32_MAX);
return 0;
}
DWORD WINAPI Thread3(LPVOID lpParam)
{
- pipe_t * pipe = (pipe_t *)lpParam;
+ hub_t * hub = (hub_t *)lpParam;
uint32_t item = 0;
- uint8_t check = 1;
uint8_t symbol_counter = 0;
while (symbol_counter < 2)
{
- while (Pipe_IsFilled(pipe))
+ while (Hub_IsFilled(hub))
{
- item = Pipe_Read(pipe);
+ item = Hub_Read(hub);
if (item == UINT32_MAX)
{
@@ -72,26 +70,25 @@ DWORD WINAPI Thread3(LPVOID lpParam)
if (item != 42 && item != 23)
printf("failed\n");
- Pipe_Write(pipe, item);
+ Hub_Write(hub, item);
}
}
- Pipe_Write(pipe, UINT32_MAX);
+ Hub_Write(hub, UINT32_MAX);
return 0;
}
DWORD WINAPI Thread4(LPVOID lpParam)
{
- pipe_t * pipe = (pipe_t *)lpParam;
+ hub_t * hub = (hub_t *)lpParam;
uint32_t item = 0;
- uint8_t check = 1;
while (item != UINT32_MAX)
{
- while (Pipe_IsFilled(pipe))
+ while (Hub_IsFilled(hub))
{
- item = Pipe_Read(pipe);
+ item = Hub_Read(hub);
if (item == UINT32_MAX)
break;
@@ -99,26 +96,26 @@ DWORD WINAPI Thread4(LPVOID lpParam)
if (item != 42 && item != 23)
printf("failed\n");
- Pipe_Write(pipe, item);
+ Hub_Write(hub, item);
}
}
- Pipe_Write(pipe, UINT32_MAX);
+ Hub_Write(hub, UINT32_MAX);
return 0;
}
DWORD WINAPI Thread5(LPVOID lpParam)
{
- pipe_t * pipe = (pipe_t *)lpParam;
+ hub_t * hub = (hub_t *)lpParam;
uint32_t item = 0;
uint8_t check = 1;
while (item != UINT32_MAX)
{
- while (Pipe_IsFilled(pipe))
+ while (Hub_IsFilled(hub))
{
- item = Pipe_Read(pipe);
+ item = Hub_Read(hub);
if (item == 0 || item > 102)
{
check = 0;
@@ -127,25 +124,25 @@ DWORD WINAPI Thread5(LPVOID lpParam)
if (item == UINT32_MAX)
break;
- Pipe_Write(pipe, item);
+ Hub_Write(hub, item);
}
}
- Pipe_Write(pipe, UINT32_MAX);
+ Hub_Write(hub, UINT32_MAX);
return 0;
}
-void Run_Threads(pipe_t * pipe1, pipe_t * pipe2, pipe_t * pipe3, pipe_t * pipe4, pipe_t * pipe5)
+void Run_Threads(hub_t * hub1, hub_t * hub2, hub_t * hub3, hub_t * hub4, hub_t * hub5)
{
/* Create Threads */
HANDLE threadHandles[5];
- threadHandles[0] = CreateThread(NULL, 0, Thread1, pipe1, 0, NULL);
- threadHandles[1] = CreateThread(NULL, 0, Thread2, pipe2, 0, NULL);
- threadHandles[2] = CreateThread(NULL, 0, Thread3, pipe3, 0, NULL);
- threadHandles[3] = CreateThread(NULL, 0, Thread4, pipe4, 0, NULL);
- threadHandles[4] = CreateThread(NULL, 0, Thread5, pipe5, 0, NULL);
+ threadHandles[0] = CreateThread(NULL, 0, Thread1, hub1, 0, NULL);
+ threadHandles[1] = CreateThread(NULL, 0, Thread2, hub2, 0, NULL);
+ threadHandles[2] = CreateThread(NULL, 0, Thread3, hub3, 0, NULL);
+ threadHandles[3] = CreateThread(NULL, 0, Thread4, hub4, 0, NULL);
+ threadHandles[4] = CreateThread(NULL, 0, Thread5, hub5, 0, NULL);
WaitForMultipleObjects(5, threadHandles, TRUE, INFINITE);
@@ -200,33 +197,26 @@ uint8_t ringbuffer_valid_and_equal(ringbuffer_t * rb1, ringbuffer_t * rb2)
return 1;
}
-static void log(pipe_t * const source, pipe_t * const target, uint32_t element)
-{
- /*wchar_t text[64];
- swprintf_s(text, 64, L"%s: %d\n", source->name, element);
- OutputDebugStringW(text);*/
-}
-
void threads(uint32_t loops)
{
- /* Create pipes and connect them */
- Pipe_Create(increment_pipe1, 1, 1, NULL, log);
- Pipe_Create(increment_pipe2, 1, 1, NULL, log);
- Pipe_Create(increment_pipe3, 2, 2, NULL, log);
- Pipe_Create(increment_pipe4, 1, 1, NULL, log);
- Pipe_Create(increment_pipe5, 1, 1, NULL, log);
- Pipe_Create(check_pipe1, 1, 1, NULL, log);
- Pipe_Create(check_pipe2, 1, 1, NULL, log);
+ /* Create hubs and connect them */
+ Hub_Create(increment_hub1, 1, 1, NULL);
+ Hub_Create(increment_hub2, 1, 1, NULL);
+ Hub_Create(increment_hub3, 2, 2, NULL);
+ Hub_Create(increment_hub4, 1, 1, NULL);
+ Hub_Create(increment_hub5, 1, 1, NULL);
+ Hub_Create(check_hub1, 1, 1, NULL);
+ Hub_Create(check_hub2, 1, 1, NULL);
- Pipe_CreateInputBuffer(increment_pipe1, NUMBER_OF_ELEMENTS+2);
- Pipe_CreateInputBuffer(increment_pipe2, NUMBER_OF_ELEMENTS+2);
+ Hub_CreateInputBuffer(increment_hub1, NUMBER_OF_ELEMENTS+2);
+ Hub_CreateInputBuffer(increment_hub2, NUMBER_OF_ELEMENTS+2);
- Pipe_CreateConnection(increment_pipe1, increment_pipe3, NUMBER_OF_ELEMENTS+2);
- Pipe_CreateConnection(increment_pipe2, increment_pipe3, NUMBER_OF_ELEMENTS+2);
- Pipe_CreateConnection(increment_pipe3, increment_pipe4, NUMBER_OF_ELEMENTS * 2 + 2);
- Pipe_CreateConnection(increment_pipe3, increment_pipe5, NUMBER_OF_ELEMENTS * 2 + 2);
- Pipe_CreateConnection(increment_pipe4, check_pipe1, NUMBER_OF_ELEMENTS * 2 + 2);
- Pipe_CreateConnection(increment_pipe5, check_pipe2, NUMBER_OF_ELEMENTS * 2 + 2);
+ Hub_CreateConnection(increment_hub1, increment_hub3, NUMBER_OF_ELEMENTS+2);
+ Hub_CreateConnection(increment_hub2, increment_hub3, NUMBER_OF_ELEMENTS+2);
+ Hub_CreateConnection(increment_hub3, increment_hub4, NUMBER_OF_ELEMENTS * 2 + 2);
+ Hub_CreateConnection(increment_hub3, increment_hub5, NUMBER_OF_ELEMENTS * 2 + 2);
+ Hub_CreateConnection(increment_hub4, check_hub1, NUMBER_OF_ELEMENTS * 2 + 2);
+ Hub_CreateConnection(increment_hub5, check_hub2, NUMBER_OF_ELEMENTS * 2 + 2);
//HANDLE hFile;
//hFile = CreateFile("buffer.txt", GENERIC_WRITE, 0, NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
@@ -245,19 +235,19 @@ void threads(uint32_t loops)
QueryPerformanceCounter(&StartingTime);
uint32_t success_counter = 0;
- for (int i = 0; i < loops; i++)
+ for (uint32_t i = 0; i < loops; i++)
{
- Run_Threads(&increment_pipe1, &increment_pipe2, &increment_pipe3, &increment_pipe4, &increment_pipe5);
+ Run_Threads(&increment_hub1, &increment_hub2, &increment_hub3, &increment_hub4, &increment_hub5);
- //if (ringbuffer_valid_and_equal(check_pipe1.input_connector->connection[0], check_pipe2.input_connector->connection[0], hFile))
- if (ringbuffer_valid_and_equal(check_pipe1.input_connector->connection[0], check_pipe2.input_connector->connection[0]))
+ //if (ringbuffer_valid_and_equal(check_hub1.input_connector->connection[0], check_hub2.input_connector->connection[0], hFile))
+ if (ringbuffer_valid_and_equal(check_hub1.input_connector->connection[0], check_hub2.input_connector->connection[0]))
success_counter++;
}
QueryPerformanceCounter(&EndingTime);
ElapsedMicroseconds.QuadPart = EndingTime.QuadPart - StartingTime.QuadPart;
ElapsedMicroseconds.QuadPart *= 1000000;
ElapsedMicroseconds.QuadPart /= Frequency.QuadPart;
- int seconds = ElapsedMicroseconds.QuadPart / 1000000;
+ int seconds = (int)ElapsedMicroseconds.QuadPart / 1000000;
int hours = seconds / 3600;
int minutes = (seconds - (hours * 3600)) / 60;