diff --git a/Pipe/Pipe.vcxproj b/Pipe/Pipe.vcxproj index 67b3c20..14739c2 100644 --- a/Pipe/Pipe.vcxproj +++ b/Pipe/Pipe.vcxproj @@ -111,6 +111,7 @@ + diff --git a/Pipe/Pipe.vcxproj.filters b/Pipe/Pipe.vcxproj.filters index 763f921..228fb25 100644 --- a/Pipe/Pipe.vcxproj.filters +++ b/Pipe/Pipe.vcxproj.filters @@ -18,6 +18,9 @@ Source Files + + Source Files + diff --git a/Pipe/main.c b/Pipe/main.c index 2c530f5..99fd2a3 100644 --- a/Pipe/main.c +++ b/Pipe/main.c @@ -7,7 +7,7 @@ #include "pipe.h" #ifdef _MSC_VER - #define inline __inline +#define inline __inline #endif /* Integrate every element of the signal. */ @@ -35,16 +35,16 @@ void square(pipe_t * const p) /* Integrate over every element of the signal. */ void integrate(pipe_t * const pipe) { - uint32_t state = *((uint32_t*)pipe->state); + uint32_t state = *((uint32_t*)pipe->state); - while (Pipe_isFilled(pipe)) + while (Pipe_isFilled(pipe)) { - uint32_t item = Pipe_Read(pipe); + uint32_t item = Pipe_Read(pipe); state = state + item; - Pipe_Write(pipe, state); + Pipe_Write(pipe, state); } - *((uint32_t*)pipe->state) = state; + *((uint32_t*)pipe->state) = state; } /* Build the sum of all elements of the signal. */ @@ -52,9 +52,9 @@ void sum(pipe_t * const pipe) { uint32_t sum = 0; - while (Pipe_isFilled(pipe)) - sum += Pipe_Read(pipe); - Pipe_Write(pipe, sum); + while (Pipe_isFilled(pipe)) + sum += Pipe_Read(pipe); + Pipe_Write(pipe, sum); } /* Build the average of all elements of the signal. */ @@ -64,169 +64,84 @@ void average(pipe_t * const pipe) uint32_t element_counter = 0; uint32_t average = 0; - while (Pipe_isFilled(pipe)) + while (Pipe_isFilled(pipe)) { - sum += Pipe_Read(pipe); + sum += Pipe_Read(pipe); element_counter++; } average = sum / element_counter; - Pipe_Write(pipe, average); + Pipe_Write(pipe, average); } /* Print the signal. */ void print(pipe_t * const pipe) { printf("\nOutput:\n"); - while (Pipe_isFilled(pipe)) - printf("%d\n", Pipe_Read(pipe)); + while (Pipe_isFilled(pipe)) + printf("%d\n", Pipe_Read(pipe)); } /* Logging function. Set by user. */ void log(pipe_t * const source, pipe_t * const target, uint32_t element) { - if (Pipe_isFull(target)) - printf("Error: Pipe %s is full!\n", target->name); + if (Pipe_isFull(target)) + printf("Error: Pipe %s is full!\n", target->name); - if (source->state == NULL && target->state == NULL) - printf("%s -> %d -> %s\n", source->name, element, target->name); - else if (source->state != NULL && target->state != NULL) - printf("%s(%d) -> %d -> %s(%d)\n", source->name, *((uint32_t*)source->state), element, target->name, *((uint32_t*)target->state)); - else if (source->state != NULL) - printf("%s(%d) -> %d -> %s\n", source->name, *((uint32_t*)source->state), element, target->name); + if (source->state == NULL && target->state == NULL) + printf("%s -> %d -> %s\n", source->name, element, target->name); + else if (source->state != NULL && target->state != NULL) + printf("%s(%d) -> %d -> %s(%d)\n", source->name, *((uint32_t*)source->state), element, target->name, *((uint32_t*)target->state)); + else if (source->state != NULL) + printf("%s(%d) -> %d -> %s\n", source->name, *((uint32_t*)source->state), element, target->name); else - printf("%s -> %d -> %s(%d)\n", source->name, element, target->name, *((uint32_t*)target->state)); + printf("%s -> %d -> %s(%d)\n", source->name, element, target->name, *((uint32_t*)target->state)); } - -#define BUF_SIZE 255 - -void DisplayMessage(HANDLE hScreen, - char *ThreadName, int Data, int Count) -{ - - TCHAR msgBuf[BUF_SIZE]; - size_t cchStringSize; - DWORD dwChars; - - // Print message using thread-safe functions. - StringCchPrintf(msgBuf, BUF_SIZE, - TEXT("Executing iteration %02d of %s" - " having data = %02d \n"), - Count, ThreadName, Data); - StringCchLength(msgBuf, BUF_SIZE, &cchStringSize); - WriteConsole(hScreen, msgBuf, cchStringSize, - &dwChars, NULL); -} - -DWORD WINAPI Thread_no_2(LPVOID lpParam) -{ - - int Data = 0; - int count = 0; - HANDLE hStdout = NULL; - - // Get Handle To screen. - // Else how will we print? - if ((hStdout = - GetStdHandle(STD_OUTPUT_HANDLE)) - == INVALID_HANDLE_VALUE) - return 1; - - // Cast the parameter to the correct - // data type passed by callee i.e main() in our case. - Data = *((int*)lpParam); - - for (count = 0; count <= 4; count++) - { - DisplayMessage(hStdout, "Thread_no_1", Data, count); - } - - return 0; -} - -DWORD WINAPI Thread_no_1(LPVOID lpParam) -{ - - int Data = 0; - int count = 0; - HANDLE hStdout = NULL; - - // Get Handle To screen. - // Else how will we print? - if ((hStdout = - GetStdHandle(STD_OUTPUT_HANDLE)) - == INVALID_HANDLE_VALUE) - return 1; - - // Cast the parameter to the correct - // data type passed by callee i.e main() in our case. - Data = *((int*)lpParam); - - for (count = 0; count <= 4; count++) - { - DisplayMessage(hStdout, "Thread_no_2", Data, count); - } - - return 0; -} - -int main(void) +extern void threads(void); +int main(int argc, char *argv[]) { uint32_t counter = 0; - /* Create pipes and connect them */ - Pipe_Create(increment_pipe, 4, 1, NULL, NULL); - Pipe_Create(square_pipe, 4, 1, NULL, NULL); - Pipe_Create(integrate_pipe, 8, 2, &counter, log); - Pipe_Create(sum_pipe, 8, 1, NULL, log); - Pipe_Create(average_pipe, 8, 1, NULL, log); - Pipe_Create(print_pipe, 4, 1, NULL, log); + if (argc == 1) + { + /* Create pipes and connect them */ + Pipe_Create(increment_pipe, 4, 1, NULL, log); + Pipe_Create(square_pipe, 4, 1, NULL, log); + Pipe_Create(integrate_pipe, 8, 2, &counter, log); + Pipe_Create(sum_pipe, 8, 2, NULL, log); + Pipe_Create(average_pipe, 8, 2, NULL, log); + Pipe_Create(print_pipe, 4, 2, NULL, log); - Pipe_Connect(&increment_pipe, &integrate_pipe); - Pipe_Connect(&square_pipe, &integrate_pipe); - Pipe_Connect(&integrate_pipe, &sum_pipe); - Pipe_Connect(&integrate_pipe, &average_pipe); - Pipe_Connect(&sum_pipe, &print_pipe); - Pipe_Connect(&average_pipe, &print_pipe); + Pipe_Connect(&increment_pipe, &integrate_pipe); + Pipe_Connect(&square_pipe, &integrate_pipe); + Pipe_Connect(&integrate_pipe, &sum_pipe); + Pipe_Connect(&integrate_pipe, &average_pipe); + Pipe_Connect(&sum_pipe, &print_pipe); + Pipe_Connect(&average_pipe, &print_pipe); - /* Create Threads */ - int Data_Of_Thread_1 = 1; - int Data_Of_Thread_2 = 2; - HANDLE Handle_Of_Thread_1 = 0; - HANDLE Handle_Of_Thread_2 = 0; + /* Create Input */ + Pipe_Insert(&increment_pipe, 1); + Pipe_Insert(&increment_pipe, 3); + Pipe_Insert(&increment_pipe, 5); - Handle_Of_Thread_1 = CreateThread(NULL, 0, Thread_no_1, &Data_Of_Thread_1, 0, NULL); - if (Handle_Of_Thread_1 == NULL) - ExitProcess(Data_Of_Thread_1); + Pipe_Insert(&square_pipe, 2); + Pipe_Insert(&square_pipe, 4); + Pipe_Insert(&square_pipe, 6); - Handle_Of_Thread_2 = CreateThread(NULL, 0, Thread_no_2, &Data_Of_Thread_2, 0, NULL); - if (Handle_Of_Thread_2 == NULL) - ExitProcess(Data_Of_Thread_2); + /* run the functions (each can run in an own thread) */ + increment(&increment_pipe); + square(&square_pipe); + integrate(&integrate_pipe); + sum(&sum_pipe); + average(&average_pipe); + print(&print_pipe); - HANDLE Array_Of_Thread_Handles[] = { Handle_Of_Thread_1, Handle_Of_Thread_1 }; - WaitForMultipleObjects(3, Array_Of_Thread_Handles, TRUE, INFINITE); - - CloseHandle(Handle_Of_Thread_1); - CloseHandle(Handle_Of_Thread_2); - - /* Generate input */ - Pipe_Insert(&increment_pipe, 1); - Pipe_Insert(&increment_pipe, 3); - Pipe_Insert(&increment_pipe, 5); - - Pipe_Insert(&square_pipe, 2); - Pipe_Insert(&square_pipe, 4); - Pipe_Insert(&square_pipe, 6); - - /* run the functions (each can run in an own thread) */ - increment(&increment_pipe); - square(&square_pipe); - integrate(&integrate_pipe); - sum(&sum_pipe); - average(&average_pipe); - print(&print_pipe); - - getchar(); + getchar(); + } + else + { + threads(); + } return 0; } diff --git a/Pipe/pipe.h b/Pipe/pipe.h index df127ed..c339fb9 100644 --- a/Pipe/pipe.h +++ b/Pipe/pipe.h @@ -6,19 +6,16 @@ /* microsoft specific */ #ifdef _MSC_VER - #define inline __inline +#define inline __inline #endif -/* number of connections */ -#define PIPE_NUMBER_OF_CONNECTIONS 4 - typedef struct pipe_tt { ringbuffer_t * input; void * state; - struct pipe_tt ** connection; + struct pipe_tt ** connection; uint32_t connection_count; - uint32_t connection_max; + uint32_t connection_max; char * name; void(*log_function)(struct pipe_tt * from, struct pipe_tt * to, uint32_t elem); } pipe_t; @@ -34,7 +31,7 @@ So the signal can inserted to the pipe system. */ static inline void Pipe_Insert(pipe_t * const pipe, uint32_t element) { - RingBuffer_Write(pipe->input, element); + RingBuffer_Write(pipe->input, element); } /* @@ -43,7 +40,7 @@ Used inside functions of the pipe system. */ static inline uint32_t Pipe_Read(pipe_t * pipe) { - return RingBuffer_Read(pipe->input); + return RingBuffer_Read(pipe->input); } /* @@ -52,10 +49,10 @@ Used inside functions of the pipe system. */ static inline void Pipe_Write(pipe_t * pipe, uint32_t element) { - for (uint8_t i = 0; i < pipe->connection_count; i++) + for (uint8_t i = 0; i < pipe->connection_count; i++) { - pipe->log_function(pipe, pipe->connection[i], element); - RingBuffer_Write(pipe->connection[i]->input, element); + pipe->log_function(pipe, pipe->connection[i], element); + RingBuffer_Write(pipe->connection[i]->input, element); } } @@ -74,7 +71,7 @@ Usefull for the logging. */ static inline uint8_t Pipe_isFull(const pipe_t * pipe) { - return RingBuffer_IsFull(pipe->input); + return RingBuffer_IsFull(pipe->input); } /* @@ -83,17 +80,17 @@ Shall be set by user. */ static inline void Pipe_Log(pipe_t * const source, pipe_t * const target, uint32_t element) { - if (Pipe_isFull(target)) - printf("Error: Pipe %s is full!\n", target->name); + if (Pipe_isFull(target)) + printf("Error: Pipe %s is full!\n", target->name); - if (source->state == NULL && target->state == NULL) - printf("%s -> %d -> %s\n", source->name, element, target->name); - else if (source->state != NULL && target->state != NULL) - printf("%s(%d) -> %d -> %s(%d)\n", source->name, *((uint32_t*)source->state), element, target->name, *((uint32_t*)target->state)); - else if (source->state != NULL) - printf("%s(%d) -> %d -> %s\n", source->name, *((uint32_t*)source->state), element, target->name); - else - printf("%s -> %d -> %s(%d)\n", source->name, element, target->name, *((uint32_t*)target->state)); + if (source->state == NULL && target->state == NULL) + printf("%s -> %d -> %s\n", source->name, element, target->name); + else if (source->state != NULL && target->state != NULL) + printf("%s(%d) -> %d -> %s(%d)\n", source->name, *((uint32_t*)source->state), element, target->name, *((uint32_t*)target->state)); + else if (source->state != NULL) + printf("%s(%d) -> %d -> %s\n", source->name, *((uint32_t*)source->state), element, target->name); + else + printf("%s -> %d -> %s(%d)\n", source->name, element, target->name, *((uint32_t*)target->state)); } @@ -129,40 +126,40 @@ A State (NULL if function has no state) for the function using the pipe. A Name and a logging function are usefull to track the dataflow. */ static inline void Pipe_Init( - pipe_t * const pipe, - ringbuffer_t * const input, - pipe_t ** pipe_connections, - uint32_t connection_max, - void * state, - char * const name, - void(*log_function)(struct pipe_tt * source, struct pipe_tt * target, uint32_t element) - ) + pipe_t * const pipe, + ringbuffer_t * const input, + pipe_t ** pipe_connections, + uint32_t connection_max, + void * state, + char * const name, + void(*log_function)(struct pipe_tt * source, struct pipe_tt * target, uint32_t element) + ) { - pipe->input = input; - pipe->state = state; - pipe->connection = pipe_connections; + pipe->input = input; + pipe->state = state; + pipe->connection = pipe_connections; - for (uint8_t i = 0; i < connection_max; i++) - pipe->connection[i] = NULL; + for (uint8_t i = 0; i < connection_max; i++) + pipe->connection[i] = NULL; - pipe->connection_count = 0; - pipe->connection_max = connection_max; - pipe->name = name; + pipe->connection_count = 0; + pipe->connection_max = connection_max; + pipe->name = name; - if (log_function == NULL) - pipe->log_function = Pipe_Log; - else - pipe->log_function = log_function; + if (log_function == NULL) + pipe->log_function = Pipe_Log; + else + pipe->log_function = log_function; } /* Connect two pipes. Pipe a sends elements to pipe b. */ static inline void Pipe_Connect(pipe_t * const source, pipe_t * const target) { - if (source->connection_count < source->connection_max) - { - source->connection[source->connection_count] = target; - source->connection_count++; - } + if (source->connection_count < source->connection_max) + { + source->connection[source->connection_count] = target; + source->connection_count++; + } } #endif diff --git a/Pipe/ringbuffer.h b/Pipe/ringbuffer.h index ec777fc..2635f17 100644 --- a/Pipe/ringbuffer.h +++ b/Pipe/ringbuffer.h @@ -4,72 +4,73 @@ #include #ifdef _MSC_VER - #define inline __inline +#define inline __inline #endif typedef struct { - uint32_t * reader; - uint32_t * writer; - uint32_t * start; - uint32_t * end; + uint32_t * reader; + uint32_t * writer; + uint32_t * start; + uint32_t * end; } ringbuffer_t; /* Use Array as RingBuffer */ static inline void RingBuffer_InitFromArray(ringbuffer_t * const ring_buffer, uint32_t * const array, uint32_t size) { - ring_buffer->start = ring_buffer->reader = ring_buffer->writer = &array[0]; - ring_buffer->end = &array[0] + size - 1; + ring_buffer->start = ring_buffer->reader = ring_buffer->writer = &array[0]; + ring_buffer->end = &array[0] + size - 1; } static inline uint32_t * RingBuffer_NextAddress(ringbuffer_t * const ring_buffer, uint32_t * const pointer) { - if (pointer == ring_buffer->end) - return ring_buffer->start; - else - return pointer + 1; + if (pointer == ring_buffer->end) + return ring_buffer->start; + else + return pointer + 1; } static inline uint8_t RingBuffer_IsFull(ringbuffer_t * const ring_buffer) { - if (RingBuffer_NextAddress(ring_buffer, ring_buffer->writer) == ring_buffer->reader) - return 1; - return 0; + if (RingBuffer_NextAddress(ring_buffer, ring_buffer->writer) == ring_buffer->reader) + return 1; + return 0; } static inline uint8_t RingBuffer_IsEmpty(ringbuffer_t * const ring_buffer) { - if (ring_buffer->writer == ring_buffer->reader) - return 1; - return 0; + if (ring_buffer->writer == ring_buffer->reader) + return 1; + return 0; } static inline uint8_t RingBuffer_IsFilled(ringbuffer_t * const ring_buffer) { - return !RingBuffer_IsEmpty(ring_buffer); + return !RingBuffer_IsEmpty(ring_buffer); } /* Write element into RingBuffer. */ static inline void RingBuffer_Write(ringbuffer_t * const ring_buffer, uint32_t element) { - if (!RingBuffer_IsFull(ring_buffer)) - { - *(ring_buffer->writer) = element; - ring_buffer->writer = RingBuffer_NextAddress(ring_buffer, ring_buffer->writer); - } + if (!RingBuffer_IsFull(ring_buffer)) + { + *(ring_buffer->writer) = element; + ring_buffer->writer = RingBuffer_NextAddress(ring_buffer, ring_buffer->writer); + } } /* Read value from RingBuffer. */ static inline uint32_t RingBuffer_Read(ringbuffer_t * const ring_buffer) { - uint32_t element = 0; + uint32_t element = 0; - if (!RingBuffer_IsEmpty(ring_buffer)) - { - element = *(ring_buffer->reader); - ring_buffer->reader = RingBuffer_NextAddress(ring_buffer, ring_buffer->reader); - } + if (!RingBuffer_IsEmpty(ring_buffer)) + { + element = *(ring_buffer->reader); + *(ring_buffer->reader) = 5; + ring_buffer->reader = RingBuffer_NextAddress(ring_buffer, ring_buffer->reader); + } - return element; + return element; } #endif diff --git a/Pipe/threads.c b/Pipe/threads.c new file mode 100644 index 0000000..8ceeb6f --- /dev/null +++ b/Pipe/threads.c @@ -0,0 +1,159 @@ +#include +#include +#include +#include + +#include "ringbuffer.h" +#include "pipe.h" + +/* Integrate every element of the signal. */ +static void increment(pipe_t * const p) +{ + while (Pipe_isFilled(p)) + { + uint32_t item = Pipe_Read(p); + item++; + Pipe_Write(p, item); + } +} + + +/* Print the signal. */ +static void write_to_file1(pipe_t * const pipe) +{ + HANDLE hFile; + char DataBuffer[128]; + DWORD dwBytesWritten = 0; + + hFile = CreateFile("thread1.txt", GENERIC_WRITE, 0, NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL); + + while (1) { + while (Pipe_isFilled(pipe)) + { + sprintf_s(DataBuffer, 128, "%d\r\n", Pipe_Read(pipe)); + WriteFile(hFile, DataBuffer, (DWORD)strlen(DataBuffer), &dwBytesWritten, NULL); + } + } +} + +static void write_to_file2(pipe_t * const pipe) +{ + HANDLE hFile; + char DataBuffer[128]; + DWORD dwBytesWritten = 0; + + hFile = CreateFile("thread2.txt", GENERIC_WRITE, 0, NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL); + + while (1) { + while (Pipe_isFilled(pipe)) + { + sprintf_s(DataBuffer, 128, "%d\r\n", Pipe_Read(pipe)); + WriteFile(hFile, DataBuffer, (DWORD)strlen(DataBuffer), &dwBytesWritten, NULL); + } + } +} + +DWORD WINAPI incrementThread1(LPVOID lpParam) +{ + pipe_t * pipe = (pipe_t *)lpParam; + LARGE_INTEGER time; + + while (1) + { + /* Generate input */ + QueryPerformanceCounter(&time); + Pipe_Insert(pipe, (uint32_t)time.u.LowPart); + QueryPerformanceCounter(&time); + Pipe_Insert(pipe, (uint32_t)time.u.LowPart); + QueryPerformanceCounter(&time); + Pipe_Insert(pipe, (uint32_t)time.u.LowPart); + + increment(pipe); + } + + return 0; +} + +DWORD WINAPI incrementThread2(LPVOID lpParam) +{ + pipe_t * pipe = (pipe_t *)lpParam; + LARGE_INTEGER time; + + while (1) + { + /* Generate input */ + QueryPerformanceCounter(&time); + Pipe_Insert(pipe, (uint32_t)time.u.LowPart); + QueryPerformanceCounter(&time); + Pipe_Insert(pipe, (uint32_t)time.u.LowPart); + QueryPerformanceCounter(&time); + Pipe_Insert(pipe, (uint32_t)time.u.LowPart); + + increment(pipe); + } + + return 0; +} + +DWORD WINAPI incrementThread3(LPVOID lpParam) +{ + pipe_t * pipe = (pipe_t *)lpParam; + + while (1) + { + increment(pipe); + } + + return 0; +} + +DWORD WINAPI Thread2(LPVOID lpParam) +{ + pipe_t * pipe = (pipe_t *)lpParam; + write_to_file2(pipe); + + return 0; +} + +DWORD WINAPI Thread1(LPVOID lpParam) +{ + pipe_t * pipe = (pipe_t *)lpParam; + write_to_file1(pipe); + + return 0; +} + +static void log(pipe_t * const source, pipe_t * const target, uint32_t element) {} + +void threads(void) +{ + /* Create pipes and connect them */ + Pipe_Create(increment_pipe1, 4, 1, NULL, log); + Pipe_Create(increment_pipe2, 4, 1, NULL, log); + Pipe_Create(increment_pipe3, 8, 2, NULL, log); + Pipe_Create(write_to_file1_pipe, 8, 1, NULL, log); + Pipe_Create(write_to_file2_pipe, 8, 1, NULL, log); + + Pipe_Connect(&increment_pipe1, &increment_pipe3); + Pipe_Connect(&increment_pipe2, &increment_pipe3); + Pipe_Connect(&increment_pipe3, &write_to_file1_pipe); + Pipe_Connect(&increment_pipe3, &write_to_file2_pipe); + + /* Create Threads */ + CreateThread(NULL, 0, incrementThread1, &increment_pipe1, 0, NULL); + CreateThread(NULL, 0, incrementThread2, &increment_pipe2, 0, NULL); + CreateThread(NULL, 0, incrementThread3, &increment_pipe3, 0, NULL); + + HANDLE thread1Handle, thread2Handle; + + thread1Handle = CreateThread(NULL, 0, Thread1, &write_to_file1_pipe, 0, NULL); + thread2Handle = CreateThread(NULL, 0, Thread2, &write_to_file2_pipe, 0, NULL); + + HANDLE threadHandles[] = { thread1Handle, thread2Handle }; + WaitForMultipleObjects(2, threadHandles, TRUE, INFINITE); + + CloseHandle(thread1Handle); + CloseHandle(thread2Handle); + + return 0; +} \ No newline at end of file