From bb7ca9e423210a817a03caaf4c504132e21ff836 Mon Sep 17 00:00:00 2001 From: Andre Heber Date: Thu, 10 Mar 2016 23:50:10 +0100 Subject: [PATCH] Speed Test created. Pipe renamed to Hub. Log function deleted. Some minor changes for speed. --- Pipe/Pipe.vcxproj | 4 +- Pipe/Pipe.vcxproj.filters | 9 +- Pipe/hub.h | 145 ++++++++++++++++++++++++ Pipe/main.c | 125 +++++++++++---------- Pipe/pipe.h | 161 -------------------------- Pipe/ringbuffer.h | 34 ++++-- Pipe/speed.c | 230 ++++++++++++++++++++++++++++++++++++++ Pipe/threads.c | 120 +++++++++----------- 8 files changed, 528 insertions(+), 300 deletions(-) create mode 100644 Pipe/hub.h delete mode 100644 Pipe/pipe.h create mode 100644 Pipe/speed.c diff --git a/Pipe/Pipe.vcxproj b/Pipe/Pipe.vcxproj index df6dfbe..5e7bf28 100644 --- a/Pipe/Pipe.vcxproj +++ b/Pipe/Pipe.vcxproj @@ -103,6 +103,7 @@ true true true + Speed true @@ -111,11 +112,12 @@ + - + diff --git a/Pipe/Pipe.vcxproj.filters b/Pipe/Pipe.vcxproj.filters index e85778a..9c7a987 100644 --- a/Pipe/Pipe.vcxproj.filters +++ b/Pipe/Pipe.vcxproj.filters @@ -21,16 +21,19 @@ Source Files + + Source Files + - - Header Files - Header Files Header Files + + Header Files + \ No newline at end of file diff --git a/Pipe/hub.h b/Pipe/hub.h new file mode 100644 index 0000000..c54642a --- /dev/null +++ b/Pipe/hub.h @@ -0,0 +1,145 @@ +#ifndef HUB_H_ +#define HUB_H_ + +#include +#include "ringbuffer.h" +#include "connector.h" + +/* microsoft specific */ +#ifdef _MSC_VER +#define inline __inline +#endif + +typedef struct hub_tt +{ + connector_t * input_connector; + connector_t * output_connector; + void * state; +} hub_t; + + +/*******************************************/ +/* Functions to work with the hub system. */ +/*******************************************/ + +/* +Inserts a element into a hub. +So the signal can inserted to the hub system. +*/ +static inline void Hub_Insert(hub_t * const hub, uint32_t element) +{ + Connector_Insert(hub->input_connector, 0, element); +} + +/* +Read an element from the hub. +Used inside functions of the hub system. +*/ +static inline uint32_t Hub_Read(hub_t * hub) +{ + return Connector_Read(hub->input_connector); +} + +/* +Write an element to connected hubs. +Used inside functions of the hub system. +*/ +static inline void Hub_Write(hub_t * hub, uint32_t element) +{ + Connector_Write(hub->output_connector, element); +} + +/* +Check if the hub contents elements. +Used inside functions of the hub system. +*/ +static inline uint8_t Hub_IsFilled(hub_t * hub) +{ + return Connector_IsFilled(hub->input_connector); +} + +static inline uint8_t Hub_IsEmpty(hub_t * hub) +{ + return Connector_IsEmpty(hub->input_connector); +} + +/* +Check if the hub has no place left for new elements. +*/ +static inline uint8_t Hub_IsFull(const hub_t * hub) +{ + return Connector_IsFull(hub->output_connector); +} + + +/***************************************/ +/* Functions to construct hub system. */ +/***************************************/ + +#define Concat2(a, b) a ## b +#define Concat(a, b) Concat2(a, b) + +/* +Macro for the creation of a hub. +Automates the creation of a ring buffer and the hub. +arg_name is the variable name and string name of the hub. +arg_size is the ring buffer size in bytes. +arg_conn_count is the number of outgoing connections. +arg_state is the given state, which can be used in the function. +arg_log is the log function, called when an element is sent. +*/ +#define Hub_Create(name, input_connection_count, output_connection_count, state) \ +static Connector_Create(Concat(name, Concat(_input_connector, __LINE__)), input_connection_count); \ +static Connector_Create(Concat(name, Concat(_output_connector, __LINE__)), output_connection_count); \ +hub_t name; \ +Hub_Init(&name, &Concat(name, Concat(_input_connector, __LINE__)), &Concat(name, Concat(_output_connector, __LINE__)), state) + +/* +Initializes a hub. +A Ringbuffer is needed to store elements from other hubs. +A State (NULL if function has no state) for the function using the hub. +*/ +static inline void Hub_Init(hub_t * const hub, connector_t * const input_connector, connector_t * const output_connector, void * const state) +{ + hub->input_connector = input_connector; + hub->output_connector = output_connector; + hub->state = state; +} + +/* Connect two hubs. hub a sends elements to hub b. */ +static inline void Hub_Connect(hub_t * const source, hub_t * const target, ringbuffer_t * ring_buffer) +{ + Connector_Add(source->output_connector, ring_buffer); + Connector_Add(target->input_connector, ring_buffer); +} + +#define Hub_CreateConnection(source, target, size) \ +RingBuffer_Create(Concat(connector_ring_buffer, __LINE__), size); \ +Hub_Connect(&source, &target, &Concat(connector_ring_buffer, __LINE__)) + + +/* +If you want to connect the hub with a ring buffer, use Hub_ConnectInputWithRingBuffer +or Hub_ConnectOutputWithRingBuffer. +*/ + +static inline void Hub_ConnectInputWithRingBuffer(hub_t * const hub, ringbuffer_t * ring_buffer) +{ + Connector_Add(hub->input_connector, ring_buffer); +} + +static inline void Hub_ConnectOutputWithRingBuffer(hub_t * const hub, ringbuffer_t * ring_buffer) +{ + Connector_Add(hub->output_connector, ring_buffer); +} + +static inline void Hub_AddInputBuffer(hub_t * const hub, ringbuffer_t * input_buffer) +{ + Connector_Add(hub->input_connector, input_buffer); +} + +#define Hub_CreateInputBuffer(hub_name, size) \ +RingBuffer_Create(Concat(connector_ring_buffer, __LINE__), size); \ +Hub_AddInputBuffer(&hub_name, &Concat(connector_ring_buffer, __LINE__)) + +#endif diff --git a/Pipe/main.c b/Pipe/main.c index c5c0784..b35fd90 100644 --- a/Pipe/main.c +++ b/Pipe/main.c @@ -5,88 +5,88 @@ #include "ringbuffer.h" #include "connector.h" -#include "pipe.h" +#include "hub.h" #ifdef _MSC_VER #define inline __inline #endif /* Integrate every element of the signal. */ -void increment(pipe_t * const p) +void increment(hub_t * const p) { - while (Pipe_IsFilled(p)) + while (Hub_IsFilled(p)) { - uint32_t item = Pipe_Read(p); + uint32_t item = Hub_Read(p); item++; - Pipe_Write(p, item); + Hub_Write(p, item); } } /* Square every element of the signal. */ -void square(pipe_t * const p) +void square(hub_t * const p) { - while (Pipe_IsFilled(p)) + while (Hub_IsFilled(p)) { - uint32_t item = Pipe_Read(p); + uint32_t item = Hub_Read(p); item = item * item; - Pipe_Write(p, item); + Hub_Write(p, item); } } /* Integrate over every element of the signal. */ -void integrate(pipe_t * const pipe) +void integrate(hub_t * const hub) { - uint32_t state = *((uint32_t*)pipe->state); + uint32_t state = *((uint32_t*)hub->state); - while (Pipe_IsFilled(pipe)) + while (Hub_IsFilled(hub)) { - uint32_t item = Pipe_Read(pipe); + uint32_t item = Hub_Read(hub); state = state + item; - Pipe_Write(pipe, state); + Hub_Write(hub, state); } - *((uint32_t*)pipe->state) = state; + *((uint32_t*)hub->state) = state; } /* Build the sum of all elements of the signal. */ -void sum(pipe_t * const pipe) +void sum(hub_t * const hub) { uint32_t sum = 0; - while (Pipe_IsFilled(pipe)) - sum += Pipe_Read(pipe); - Pipe_Write(pipe, sum); + while (Hub_IsFilled(hub)) + sum += Hub_Read(hub); + Hub_Write(hub, sum); } /* Build the average of all elements of the signal. */ -void average(pipe_t * const pipe) +void average(hub_t * const hub) { uint32_t sum = 0; uint32_t element_counter = 0; uint32_t average = 0; - while (Pipe_IsFilled(pipe)) + while (Hub_IsFilled(hub)) { - sum += Pipe_Read(pipe); + sum += Hub_Read(hub); element_counter++; } average = sum / element_counter; - Pipe_Write(pipe, average); + Hub_Write(hub, average); } /* Print the signal. */ -void print(pipe_t * const pipe) +void print(hub_t * const hub) { printf("\nOutput:\n"); - while (Pipe_IsFilled(pipe)) - printf("%d\n", Pipe_Read(pipe)); + while (Hub_IsFilled(hub)) + printf("%d\n", Hub_Read(hub)); } /* Logging function. Set by user. */ -void log(pipe_t * const source, pipe_t * const target, uint32_t element) -{ - //if (Pipe_IsFull(target)) - // printf("Error: Pipe %s is full!\n", target->name); +//void log(hub_t * const source, hub_t * const target, uint32_t element) +//{ + //if (Hub_IsFull(target)) + // printf("Error: Hub %s is full!\n", target->name); //if (source->state == NULL && target->state == NULL) // printf("%s -> %d -> %s\n", source->name, element, target->name); @@ -97,10 +97,11 @@ void log(pipe_t * const source, pipe_t * const target, uint32_t element) //else // printf("%s -> %d -> %s(%d)\n", source->name, element, target->name, *((uint32_t*)target->state)); - printf("%s: %d\n", source->name, element); -} + //printf("%s: %d\n", source->name, element); +//} extern void threads(uint32_t loops); +extern void speed(uint32_t items_count); int main(int argc, char *argv[]) { @@ -108,47 +109,51 @@ int main(int argc, char *argv[]) if (argc == 1) { - /* Create pipes and connect them */ - Pipe_Create(increment_pipe, 1, 1, NULL, log); - Pipe_Create(square_pipe, 1, 1, NULL, log); - Pipe_Create(integrate_pipe, 2, 2, &counter, log); - Pipe_Create(sum_pipe, 1, 1, NULL, log); - Pipe_Create(average_pipe, 1, 1, NULL, log); - Pipe_Create(print_pipe, 2, 1, NULL, log); + /* Create hubs and connect them */ + Hub_Create(increment_hub, 1, 1, NULL); + Hub_Create(square_hub, 1, 1, NULL); + Hub_Create(integrate_hub, 2, 2, &counter); + Hub_Create(sum_hub, 1, 1, NULL); + Hub_Create(average_hub, 1, 1, NULL); + Hub_Create(print_hub, 2, 1, NULL); - Pipe_CreateInputBuffer(increment_pipe, 4); - Pipe_CreateInputBuffer(square_pipe, 4); + Hub_CreateInputBuffer(increment_hub, 4); + Hub_CreateInputBuffer(square_hub, 4); - Pipe_CreateConnection(increment_pipe, integrate_pipe, 4); - Pipe_CreateConnection(square_pipe, integrate_pipe, 4); - Pipe_CreateConnection(integrate_pipe, sum_pipe, 8); - Pipe_CreateConnection(integrate_pipe, average_pipe, 8); - Pipe_CreateConnection(sum_pipe, print_pipe, 4); - Pipe_CreateConnection(average_pipe, print_pipe, 4); + Hub_CreateConnection(increment_hub, integrate_hub, 4); + Hub_CreateConnection(square_hub, integrate_hub, 4); + Hub_CreateConnection(integrate_hub, sum_hub, 8); + Hub_CreateConnection(integrate_hub, average_hub, 8); + Hub_CreateConnection(sum_hub, print_hub, 4); + Hub_CreateConnection(average_hub, print_hub, 4); /* Create Input */ - Pipe_Insert(&increment_pipe, 1); - Pipe_Insert(&increment_pipe, 3); - Pipe_Insert(&increment_pipe, 5); + Hub_Insert(&increment_hub, 1); + Hub_Insert(&increment_hub, 3); + Hub_Insert(&increment_hub, 5); - Pipe_Insert(&square_pipe, 2); - Pipe_Insert(&square_pipe, 4); - Pipe_Insert(&square_pipe, 6); + Hub_Insert(&square_hub, 2); + Hub_Insert(&square_hub, 4); + Hub_Insert(&square_hub, 6); /* run the functions (each can run in an own thread) */ - increment(&increment_pipe); - square(&square_pipe); - integrate(&integrate_pipe); - sum(&sum_pipe); - average(&average_pipe); - print(&print_pipe); + increment(&increment_hub); + square(&square_hub); + integrate(&integrate_hub); + sum(&sum_hub); + average(&average_hub); + print(&print_hub); getchar(); } - else + else if (argc == 2) { threads(atoi(argv[1])); } + else + { + speed(atoi(argv[2])); + } return 0; } diff --git a/Pipe/pipe.h b/Pipe/pipe.h deleted file mode 100644 index fba80e8..0000000 --- a/Pipe/pipe.h +++ /dev/null @@ -1,161 +0,0 @@ -#ifndef PIPE_H_ -#define PIPE_H_ - -#include -#include "ringbuffer.h" -#include "connector.h" - -/* microsoft specific */ -#ifdef _MSC_VER -#define inline __inline -#endif - -typedef struct pipe_tt -{ - connector_t * input_connector; - connector_t * output_connector; - void * state; - char * name; - void(*log_function)(struct pipe_tt * from, struct pipe_tt * to, uint32_t elem); -} pipe_t; - - -/*******************************************/ -/* Functions to work with the pipe system. */ -/*******************************************/ - -/* -Inserts a element into a pipe. -So the signal can inserted to the pipe system. -*/ -static inline void Pipe_Insert(pipe_t * const pipe, uint32_t element) -{ - Connector_Insert(pipe->input_connector, 0, element); -} - -/* -Read an element from the pipe. -Used inside functions of the pipe system. -*/ -static inline uint32_t Pipe_Read(pipe_t * pipe) -{ - return Connector_Read(pipe->input_connector); -} - -/* -Write an element to connected pipes. -Used inside functions of the pipe system. -*/ -static inline void Pipe_Write(pipe_t * pipe, uint32_t element) -{ - pipe->log_function(pipe, NULL, element); - Connector_Write(pipe->output_connector, element); -} - -/* -Check if the pipe contents elements. -Used inside functions of the pipe system. -*/ -static inline uint8_t Pipe_IsFilled(pipe_t * pipe) -{ - return Connector_IsFilled(pipe->input_connector); -} - -/* -Check if the pipe has no place left for new elements. -Usefull for the logging. -*/ -static inline uint8_t Pipe_IsFull(const pipe_t * pipe) -{ - return Connector_IsFull(pipe->input_connector); -} - -/* -Default Logging function. -Shall be set by user. -*/ -static inline void Pipe_Log(pipe_t * const source, pipe_t * const target, uint32_t element) -{ - if (Pipe_IsFull(target)) - printf("Error: Pipe %s is full!\n", target->name); - - if (source->state == NULL && target->state == NULL) - printf("%s -> %d -> %s\n", source->name, element, target->name); - else if (source->state != NULL && target->state != NULL) - printf("%s(%d) -> %d -> %s(%d)\n", source->name, *((uint32_t*)source->state), element, target->name, *((uint32_t*)target->state)); - else if (source->state != NULL) - printf("%s(%d) -> %d -> %s\n", source->name, *((uint32_t*)source->state), element, target->name); - else - printf("%s -> %d -> %s(%d)\n", source->name, element, target->name, *((uint32_t*)target->state)); -} - - -/***************************************/ -/* Functions to construct pipe system. */ -/***************************************/ - -#define Concat2(a, b) a ## b -#define Concat(a, b) Concat2(a, b) - -/* -Macro for the creation of a pipe. -Automates the creation of a ring buffer and the pipe. -arg_name is the variable name and string name of the pipe. -arg_size is the ring buffer size in bytes. -arg_conn_count is the number of outgoing connections. -arg_state is the given state, which can be used in the function. -arg_log is the log function, called when an element is sent. -*/ -#define Pipe_Create(name, input_connection_count, output_connection_count, state, log) \ -static Connector_Create(Concat(name, Concat(_input_connector, __LINE__)), input_connection_count); \ -static Connector_Create(Concat(name, Concat(_output_connector, __LINE__)), output_connection_count); \ -pipe_t name; \ -Pipe_Init(&name, &Concat(name, Concat(_input_connector, __LINE__)), &Concat(name, Concat(_output_connector, __LINE__)), state, #name, log) - -/* -Initializes a pipe. -A Ringbuffer is needed to store elements from other pipes. -A State (NULL if function has no state) for the function using the pipe. -A Name and a logging function are usefull to track the dataflow. -*/ -static inline void Pipe_Init( - pipe_t * const pipe, - connector_t * input_connector, - connector_t * output_connector, - void * state, - char * const name, - void(*log_function)(struct pipe_tt * source, struct pipe_tt * target, uint32_t element) - ) -{ - pipe->input_connector = input_connector; - pipe->output_connector = output_connector; - pipe->state = state; - pipe->name = name; - - if (log_function == NULL) - pipe->log_function = Pipe_Log; - else - pipe->log_function = log_function; -} - -/* Connect two pipes. Pipe a sends elements to pipe b. */ -static inline void Pipe_Connect(pipe_t * const source, pipe_t * const target, ringbuffer_t * ring_buffer) -{ - Connector_Add(source->output_connector, ring_buffer); - Connector_Add(target->input_connector, ring_buffer); -} - -#define Pipe_CreateConnection(source, target, size) \ -RingBuffer_Create(Concat(connector_ring_buffer, __LINE__), size); \ -Pipe_Connect(&source, &target, &Concat(connector_ring_buffer, __LINE__)) - -static inline void Pipe_AddInputBuffer(pipe_t * const pipe, ringbuffer_t * input_buffer) -{ - Connector_Add(pipe->input_connector, input_buffer); -} - -#define Pipe_CreateInputBuffer(pipe_name, size) \ -RingBuffer_Create(Concat(connector_ring_buffer, __LINE__), size); \ -Pipe_AddInputBuffer(&pipe_name, &Concat(connector_ring_buffer, __LINE__)) - -#endif diff --git a/Pipe/ringbuffer.h b/Pipe/ringbuffer.h index faeefdf..1a71519 100644 --- a/Pipe/ringbuffer.h +++ b/Pipe/ringbuffer.h @@ -7,6 +7,12 @@ #define inline __inline #endif +/* +A thread-safe ring buffer, which uses an pre-declared array as buffer. +But you have to declare a buffer with one more item than you really need. +For instance, if you need a ring buffer for 10 items, the array must be declared for 11 items. +*/ + typedef struct { uint32_t * reader; uint32_t * writer; @@ -19,9 +25,6 @@ static inline void RingBuffer_Init(ringbuffer_t * const ring_buffer, uint32_t * { ring_buffer->start = ring_buffer->reader = ring_buffer->writer = &array[0]; ring_buffer->end = &array[0] + size - 1; - - for (uint32_t i = 0; i < size; i++) - array[i] = 0; } static inline uint32_t * RingBuffer_NextAddress(ringbuffer_t * const ring_buffer, uint32_t * const pointer) @@ -51,7 +54,6 @@ static inline uint8_t RingBuffer_IsFilled(ringbuffer_t * const ring_buffer) return !RingBuffer_IsEmpty(ring_buffer); } -/* Write element into RingBuffer. */ static inline void RingBuffer_Write(ringbuffer_t * const ring_buffer, uint32_t element) { if (!RingBuffer_IsFull(ring_buffer)) @@ -61,7 +63,6 @@ static inline void RingBuffer_Write(ringbuffer_t * const ring_buffer, uint32_t e } } -/* Read value from RingBuffer. */ static inline uint32_t RingBuffer_Read(ringbuffer_t * const ring_buffer) { uint32_t element = 0; @@ -69,19 +70,32 @@ static inline uint32_t RingBuffer_Read(ringbuffer_t * const ring_buffer) if (!RingBuffer_IsEmpty(ring_buffer)) { element = *(ring_buffer->reader); - *(ring_buffer->reader) = 30; ring_buffer->reader = RingBuffer_NextAddress(ring_buffer, ring_buffer->reader); } return element; } + +#define Concat2(a, b) a ## b +#define Concat(a, b) Concat2(a, b) +#define UniqueBuffer(name) Concat(name, Concat(_buffer, __LINE__)) + /* Macro for the creation of a ring buffer. */ -#define RingBuffer_Create(name, size) \ -uint32_t Concat(name, Concat(_buffer, __LINE__))[size]; \ -ringbuffer_t name; \ -RingBuffer_Init(&name, Concat(name, Concat(_buffer, __LINE__)), size) \ +#define RingBuffer_Create(name, size) \ +uint32_t UniqueBuffer(name)[size]; \ +ringbuffer_t name; \ +RingBuffer_Init(&name, UniqueBuffer(name), size) \ + +#define RingBuffer(name, size) \ +uint32_t UniqueBuffer(name)[size]; \ +ringbuffer_t name = { \ + .reader = &UniqueBuffer(name)[0], \ + .writer = &UniqueBuffer(name)[0], \ + .start = &UniqueBuffer(name)[0], \ + .end = &UniqueBuffer(name)[0] + sizeof(UniqueBuffer(name)) / sizeof(UniqueBuffer(name)[0]) \ +} #endif diff --git a/Pipe/speed.c b/Pipe/speed.c new file mode 100644 index 0000000..1d35a3c --- /dev/null +++ b/Pipe/speed.c @@ -0,0 +1,230 @@ +#include +#include +#include +#include + +#include "ringbuffer.h" +#include "hub.h" + +uint32_t counter; + +DWORD WINAPI Speed_Thread1(LPVOID lpParam) +{ + hub_t * hub = (hub_t *)lpParam; + + /* Generate input */ + for (uint32_t i = 0; i < counter; i++) + { + while(Hub_IsFull(hub)) {} + Hub_Write(hub, i); + } + while (Hub_IsFull(hub)) {} + Hub_Write(hub, UINT32_MAX); + + return 0; +} + +DWORD WINAPI Speed_Thread2(LPVOID lpParam) +{ + hub_t * hub = (hub_t *)lpParam; + uint32_t item = 0; + + while (item != UINT32_MAX) + { + if (Hub_IsFilled(hub)) + { + item = Hub_Read(hub); + //printf("%d\n", item); + while (Hub_IsFull(hub)) {} + Hub_Write(hub, item); + } + } + + return 0; +} + +DWORD WINAPI Speed_Thread3(LPVOID lpParam) +{ + hub_t * hub = (hub_t *)lpParam; + uint32_t item = 0; + + while (item != UINT32_MAX) + { + if (Hub_IsFilled(hub)) + { + item = Hub_Read(hub); + while (Hub_IsFull(hub)) {} + Hub_Write(hub, item); + } + } + + return 0; +} + +DWORD WINAPI Speed_Thread4(LPVOID lpParam) +{ + hub_t * hub = (hub_t *)lpParam; + uint32_t item = 0; + + while (item != UINT32_MAX) + { + if (Hub_IsFilled(hub)) + { + item = Hub_Read(hub); + while (Hub_IsFull(hub)) {} + Hub_Write(hub, item); + } + } + + return 0; +} + +DWORD WINAPI Speed_Thread5(LPVOID lpParam) +{ + hub_t * hub = (hub_t *)lpParam; + uint32_t item = 0; + + while (item != UINT32_MAX) + { + if (Hub_IsFilled(hub)) + { + item = Hub_Read(hub); + while (Hub_IsFull(hub)) {} + Hub_Write(hub, item); + } + } + + return 0; +} + +DWORD WINAPI Speed_Thread6(LPVOID lpParam) +{ + hub_t * hub = (hub_t *)lpParam; + uint32_t item = 0; + + while (item != UINT32_MAX) + { + if (Hub_IsFilled(hub)) + { + item = Hub_Read(hub); + while (Hub_IsFull(hub)) {} + Hub_Write(hub, item); + } + } + + return 0; +} + +DWORD WINAPI Speed_Thread7(LPVOID lpParam) +{ + hub_t * hub = (hub_t *)lpParam; + uint32_t item = 0; + + while (item != UINT32_MAX) + { + if (Hub_IsFilled(hub)) + { + item = Hub_Read(hub); + while (Hub_IsFull(hub)) {} + Hub_Write(hub, item); + } + } + + return 0; +} + +DWORD WINAPI Speed_Thread8(LPVOID lpParam) +{ + hub_t * hub = (hub_t *)lpParam; + uint32_t item = 0; + + while (item != UINT32_MAX) + { + if (Hub_IsFilled(hub)) + { + item = Hub_Read(hub); + } + } + + return 0; +} + +void speed(uint32_t items_count) +{ + counter = items_count; + + /* Create hubs and connect them */ + Hub_Create(hub1, 1, 1, NULL); + Hub_Create(hub2, 1, 1, NULL); + Hub_Create(hub3, 1, 1, NULL); + Hub_Create(hub4, 1, 1, NULL); + Hub_Create(hub5, 1, 1, NULL); + Hub_Create(hub6, 1, 1, NULL); + Hub_Create(hub7, 1, 1, NULL); + Hub_Create(hub8, 1, 1, NULL); + +#define RINGBUFFER_SIZE 1024 + Hub_CreateConnection(hub1, hub2, RINGBUFFER_SIZE); + Hub_CreateConnection(hub2, hub3, RINGBUFFER_SIZE); + Hub_CreateConnection(hub3, hub4, RINGBUFFER_SIZE); + Hub_CreateConnection(hub4, hub5, RINGBUFFER_SIZE); + Hub_CreateConnection(hub5, hub6, RINGBUFFER_SIZE); + Hub_CreateConnection(hub6, hub7, RINGBUFFER_SIZE); + Hub_CreateConnection(hub7, hub8, RINGBUFFER_SIZE); + + SYSTEMTIME local_time; + GetLocalTime(&local_time); + + printf("\n"); + printf("Starting Speed Test\n"); + printf("at %02d/%02d/%02d %02d:%02d:%02d\n", local_time.wYear, local_time.wMonth, local_time.wDay, local_time.wHour, local_time.wMinute, local_time.wSecond); + printf("\n"); + + LARGE_INTEGER Frequency; + LARGE_INTEGER StartingTime, EndingTime, ElapsedMicroseconds; + QueryPerformanceFrequency(&Frequency); + QueryPerformanceCounter(&StartingTime); + + + /* Create Threads */ + HANDLE threadHandles[8]; + + threadHandles[0] = CreateThread(NULL, 0, Speed_Thread1, &hub1, 0, NULL); + threadHandles[1] = CreateThread(NULL, 0, Speed_Thread2, &hub2, 0, NULL); + threadHandles[2] = CreateThread(NULL, 0, Speed_Thread3, &hub3, 0, NULL); + threadHandles[3] = CreateThread(NULL, 0, Speed_Thread4, &hub4, 0, NULL); + threadHandles[4] = CreateThread(NULL, 0, Speed_Thread5, &hub5, 0, NULL); + threadHandles[5] = CreateThread(NULL, 0, Speed_Thread6, &hub6, 0, NULL); + threadHandles[6] = CreateThread(NULL, 0, Speed_Thread7, &hub7, 0, NULL); + threadHandles[7] = CreateThread(NULL, 0, Speed_Thread8, &hub8, 0, NULL); + + WaitForMultipleObjects(8, threadHandles, TRUE, INFINITE); + + CloseHandle(threadHandles[0]); + CloseHandle(threadHandles[1]); + CloseHandle(threadHandles[2]); + CloseHandle(threadHandles[3]); + CloseHandle(threadHandles[4]); + CloseHandle(threadHandles[5]); + CloseHandle(threadHandles[6]); + CloseHandle(threadHandles[7]); + + QueryPerformanceCounter(&EndingTime); + ElapsedMicroseconds.QuadPart = EndingTime.QuadPart - StartingTime.QuadPart; + ElapsedMicroseconds.QuadPart *= 1000000; + ElapsedMicroseconds.QuadPart /= Frequency.QuadPart; + int seconds = (int)ElapsedMicroseconds.QuadPart / 1000000; + + int hours = seconds / 3600; + int minutes = (seconds - (hours * 3600)) / 60; + seconds = seconds - (minutes * 60); + + printf("Count of Concurrent Data Operations: %d\n", items_count * 14); + + printf("\n"); + GetLocalTime(&local_time); + printf("Test finished\n"); + printf("at %02d/%02d/%02d %02d:%02d:%02d\n", local_time.wYear, local_time.wMonth, local_time.wDay, local_time.wHour, local_time.wMinute, local_time.wSecond); + printf("Elapsed time: %02d:%02d:%02d", hours, minutes, seconds); + printf("\n"); +} \ No newline at end of file diff --git a/Pipe/threads.c b/Pipe/threads.c index 620528c..83ffc84 100644 --- a/Pipe/threads.c +++ b/Pipe/threads.c @@ -4,64 +4,62 @@ #include #include "ringbuffer.h" -#include "pipe.h" +#include "hub.h" #define NUMBER_OF_ELEMENTS 1000 /* Integrate every element of the signal. */ -static void increment(pipe_t * const pipe) +static void increment(hub_t * const hub) { - uint8_t check = 1; - while (Pipe_IsFilled(pipe)) + while (Hub_IsFilled(hub)) { - uint32_t item = Pipe_Read(pipe); + uint32_t item = Hub_Read(hub); // do something - Pipe_Write(pipe, item); + Hub_Write(hub, item); } } DWORD WINAPI Thread1(LPVOID lpParam) { - pipe_t * pipe = (pipe_t *)lpParam; + hub_t * hub = (hub_t *)lpParam; /* Generate input */ for (int i = 0; i < NUMBER_OF_ELEMENTS; i++) - Pipe_Insert(pipe, 42); - increment(pipe); + Hub_Insert(hub, 42); + increment(hub); - Pipe_Write(pipe, UINT32_MAX); + Hub_Write(hub, UINT32_MAX); return 0; } DWORD WINAPI Thread2(LPVOID lpParam) { - pipe_t * pipe = (pipe_t *)lpParam; + hub_t * hub = (hub_t *)lpParam; /* Generate input */ for (int i = 0; i < NUMBER_OF_ELEMENTS; i++) - Pipe_Insert(pipe, 23); - increment(pipe); + Hub_Insert(hub, 23); + increment(hub); - Pipe_Write(pipe, UINT32_MAX); + Hub_Write(hub, UINT32_MAX); return 0; } DWORD WINAPI Thread3(LPVOID lpParam) { - pipe_t * pipe = (pipe_t *)lpParam; + hub_t * hub = (hub_t *)lpParam; uint32_t item = 0; - uint8_t check = 1; uint8_t symbol_counter = 0; while (symbol_counter < 2) { - while (Pipe_IsFilled(pipe)) + while (Hub_IsFilled(hub)) { - item = Pipe_Read(pipe); + item = Hub_Read(hub); if (item == UINT32_MAX) { @@ -72,26 +70,25 @@ DWORD WINAPI Thread3(LPVOID lpParam) if (item != 42 && item != 23) printf("failed\n"); - Pipe_Write(pipe, item); + Hub_Write(hub, item); } } - Pipe_Write(pipe, UINT32_MAX); + Hub_Write(hub, UINT32_MAX); return 0; } DWORD WINAPI Thread4(LPVOID lpParam) { - pipe_t * pipe = (pipe_t *)lpParam; + hub_t * hub = (hub_t *)lpParam; uint32_t item = 0; - uint8_t check = 1; while (item != UINT32_MAX) { - while (Pipe_IsFilled(pipe)) + while (Hub_IsFilled(hub)) { - item = Pipe_Read(pipe); + item = Hub_Read(hub); if (item == UINT32_MAX) break; @@ -99,26 +96,26 @@ DWORD WINAPI Thread4(LPVOID lpParam) if (item != 42 && item != 23) printf("failed\n"); - Pipe_Write(pipe, item); + Hub_Write(hub, item); } } - Pipe_Write(pipe, UINT32_MAX); + Hub_Write(hub, UINT32_MAX); return 0; } DWORD WINAPI Thread5(LPVOID lpParam) { - pipe_t * pipe = (pipe_t *)lpParam; + hub_t * hub = (hub_t *)lpParam; uint32_t item = 0; uint8_t check = 1; while (item != UINT32_MAX) { - while (Pipe_IsFilled(pipe)) + while (Hub_IsFilled(hub)) { - item = Pipe_Read(pipe); + item = Hub_Read(hub); if (item == 0 || item > 102) { check = 0; @@ -127,25 +124,25 @@ DWORD WINAPI Thread5(LPVOID lpParam) if (item == UINT32_MAX) break; - Pipe_Write(pipe, item); + Hub_Write(hub, item); } } - Pipe_Write(pipe, UINT32_MAX); + Hub_Write(hub, UINT32_MAX); return 0; } -void Run_Threads(pipe_t * pipe1, pipe_t * pipe2, pipe_t * pipe3, pipe_t * pipe4, pipe_t * pipe5) +void Run_Threads(hub_t * hub1, hub_t * hub2, hub_t * hub3, hub_t * hub4, hub_t * hub5) { /* Create Threads */ HANDLE threadHandles[5]; - threadHandles[0] = CreateThread(NULL, 0, Thread1, pipe1, 0, NULL); - threadHandles[1] = CreateThread(NULL, 0, Thread2, pipe2, 0, NULL); - threadHandles[2] = CreateThread(NULL, 0, Thread3, pipe3, 0, NULL); - threadHandles[3] = CreateThread(NULL, 0, Thread4, pipe4, 0, NULL); - threadHandles[4] = CreateThread(NULL, 0, Thread5, pipe5, 0, NULL); + threadHandles[0] = CreateThread(NULL, 0, Thread1, hub1, 0, NULL); + threadHandles[1] = CreateThread(NULL, 0, Thread2, hub2, 0, NULL); + threadHandles[2] = CreateThread(NULL, 0, Thread3, hub3, 0, NULL); + threadHandles[3] = CreateThread(NULL, 0, Thread4, hub4, 0, NULL); + threadHandles[4] = CreateThread(NULL, 0, Thread5, hub5, 0, NULL); WaitForMultipleObjects(5, threadHandles, TRUE, INFINITE); @@ -200,33 +197,26 @@ uint8_t ringbuffer_valid_and_equal(ringbuffer_t * rb1, ringbuffer_t * rb2) return 1; } -static void log(pipe_t * const source, pipe_t * const target, uint32_t element) -{ - /*wchar_t text[64]; - swprintf_s(text, 64, L"%s: %d\n", source->name, element); - OutputDebugStringW(text);*/ -} - void threads(uint32_t loops) { - /* Create pipes and connect them */ - Pipe_Create(increment_pipe1, 1, 1, NULL, log); - Pipe_Create(increment_pipe2, 1, 1, NULL, log); - Pipe_Create(increment_pipe3, 2, 2, NULL, log); - Pipe_Create(increment_pipe4, 1, 1, NULL, log); - Pipe_Create(increment_pipe5, 1, 1, NULL, log); - Pipe_Create(check_pipe1, 1, 1, NULL, log); - Pipe_Create(check_pipe2, 1, 1, NULL, log); + /* Create hubs and connect them */ + Hub_Create(increment_hub1, 1, 1, NULL); + Hub_Create(increment_hub2, 1, 1, NULL); + Hub_Create(increment_hub3, 2, 2, NULL); + Hub_Create(increment_hub4, 1, 1, NULL); + Hub_Create(increment_hub5, 1, 1, NULL); + Hub_Create(check_hub1, 1, 1, NULL); + Hub_Create(check_hub2, 1, 1, NULL); - Pipe_CreateInputBuffer(increment_pipe1, NUMBER_OF_ELEMENTS+2); - Pipe_CreateInputBuffer(increment_pipe2, NUMBER_OF_ELEMENTS+2); + Hub_CreateInputBuffer(increment_hub1, NUMBER_OF_ELEMENTS+2); + Hub_CreateInputBuffer(increment_hub2, NUMBER_OF_ELEMENTS+2); - Pipe_CreateConnection(increment_pipe1, increment_pipe3, NUMBER_OF_ELEMENTS+2); - Pipe_CreateConnection(increment_pipe2, increment_pipe3, NUMBER_OF_ELEMENTS+2); - Pipe_CreateConnection(increment_pipe3, increment_pipe4, NUMBER_OF_ELEMENTS * 2 + 2); - Pipe_CreateConnection(increment_pipe3, increment_pipe5, NUMBER_OF_ELEMENTS * 2 + 2); - Pipe_CreateConnection(increment_pipe4, check_pipe1, NUMBER_OF_ELEMENTS * 2 + 2); - Pipe_CreateConnection(increment_pipe5, check_pipe2, NUMBER_OF_ELEMENTS * 2 + 2); + Hub_CreateConnection(increment_hub1, increment_hub3, NUMBER_OF_ELEMENTS+2); + Hub_CreateConnection(increment_hub2, increment_hub3, NUMBER_OF_ELEMENTS+2); + Hub_CreateConnection(increment_hub3, increment_hub4, NUMBER_OF_ELEMENTS * 2 + 2); + Hub_CreateConnection(increment_hub3, increment_hub5, NUMBER_OF_ELEMENTS * 2 + 2); + Hub_CreateConnection(increment_hub4, check_hub1, NUMBER_OF_ELEMENTS * 2 + 2); + Hub_CreateConnection(increment_hub5, check_hub2, NUMBER_OF_ELEMENTS * 2 + 2); //HANDLE hFile; //hFile = CreateFile("buffer.txt", GENERIC_WRITE, 0, NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL); @@ -245,19 +235,19 @@ void threads(uint32_t loops) QueryPerformanceCounter(&StartingTime); uint32_t success_counter = 0; - for (int i = 0; i < loops; i++) + for (uint32_t i = 0; i < loops; i++) { - Run_Threads(&increment_pipe1, &increment_pipe2, &increment_pipe3, &increment_pipe4, &increment_pipe5); + Run_Threads(&increment_hub1, &increment_hub2, &increment_hub3, &increment_hub4, &increment_hub5); - //if (ringbuffer_valid_and_equal(check_pipe1.input_connector->connection[0], check_pipe2.input_connector->connection[0], hFile)) - if (ringbuffer_valid_and_equal(check_pipe1.input_connector->connection[0], check_pipe2.input_connector->connection[0])) + //if (ringbuffer_valid_and_equal(check_hub1.input_connector->connection[0], check_hub2.input_connector->connection[0], hFile)) + if (ringbuffer_valid_and_equal(check_hub1.input_connector->connection[0], check_hub2.input_connector->connection[0])) success_counter++; } QueryPerformanceCounter(&EndingTime); ElapsedMicroseconds.QuadPart = EndingTime.QuadPart - StartingTime.QuadPart; ElapsedMicroseconds.QuadPart *= 1000000; ElapsedMicroseconds.QuadPart /= Frequency.QuadPart; - int seconds = ElapsedMicroseconds.QuadPart / 1000000; + int seconds = (int)ElapsedMicroseconds.QuadPart / 1000000; int hours = seconds / 3600; int minutes = (seconds - (hours * 3600)) / 60;