This post presents a class that can capture single keypress in the terminal console without blocking the background processes.
The script starts with a Fibonacci subroutine running in background and a keyboard listener running in foreground. The listener is an instance derived from the class NonBlockingConsole
which functions as suggested by its name.
The script can be stopped by either one of the way:
- Press q to stop manually.
- Wait until 50 Fibonacci numbers are found.
Meanwhile, you can press f to check the progress of the background process while the script is running.
Output
# python3 pc-1.py Menu -------------------------------- q - quit f - show FibNum progress Process FibNum(P1) is running... P1: F(10)... P1: F(12) = 144 P1: F(20)... P1: F(30)... Key a is sent to Queue. Key s is sent to Queue. P1: F(36) = 14930352 P1: F(40)... Key n is sent to Queue. Key k is sent to Queue. P1: F(50)... P1: Stop all processes... Quit gracefully.
Source Code
#!/usr/bin/python3 from multiprocessing import Process, Queue, Value, Lock import ctypes import sys, select, tty, termios import time # Non-blocking console single keypress input class NonBlockingConsole(object): def __enter__(self): self.old_settings = termios.tcgetattr(sys.stdin) tty.setcbreak(sys.stdin.fileno()) return self def __exit__(self, type, value, traceback): termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.old_settings) def get_data(self): if select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []): ''' # version for detecting some # special keys ch = sys.stdin.read(1) if ord(ch)==27: ch1 = sys.stdin.read(1) ch2 = sys.stdin.read(1) if ch1=='[' and ch2 == 'A': ch = 'up' elif ch1=='[' and ch2 == 'B': ch = 'down' elif ch1=='[' and ch2 == 'C': ch = 'right' elif ch1=='[' and ch2 == 'D': ch = 'left' elif ord(ch)==127: ch = 'backspace' return ch ''' # return raw code only without # identifying special keys return sys.stdin.read(1) return False # Processes # --------- def FibNum(pRun, pOut, pLock, pQueue): mN1 = 1 mN2 = 1 mIdx = 3 print('Process FibNum(P1) is running...\r') while pRun.value==True: mN = mN1 + mN2 if mIdx%10==0: print(f'P1: F({mIdx})...\r') if pOut.value==True: mmsg = f'P1: F({mIdx}) = {mN}\r' pQueue.put(mmsg) with pLock: pOut.value = False mN1 = mN2 mN2 = mN mIdx += 1 time.sleep(0.5) if mIdx > 50: with pLock: pRun.value = False print('P1: Stop all processes...\r') def OutputMsg(pRun, pQueue): while pRun.value==True: if pQueue.empty(): continue mmsg = pQueue.get() print(mmsg) # main program # ------------------------------------------------- # a message queue for communication between # process gQueue = Queue() # the following are shared variables used in the # main program and the processes. gRun = Value(ctypes.c_bool,True) # continue to run gP1Out = Value(ctypes.c_bool,False) # print out in process 1 # a lock is required whenever a process wants to update # a shared value gLock = Lock() # show menu print(' Menu') print('--------------------------------') print(' q - quit') print(' f - show FibNum progress') print() # define processes p0 = Process(target=OutputMsg,args=(gRun, gQueue)) p1 = Process(target=FibNum,args=(gRun, gP1Out, gLock, gQueue)) # start all processes p0.start() p1.start() # activate non blocking keypress # listener with NonBlockingConsole() as nonblock: while gRun.value: inp = nonblock.get_data() if inp: if (inp=="q"): # stop all processes with gLock: gRun.value = False print("Manual Quitting the program...") elif (inp=="f"): with gLock: gP1Out.value = True else: mmsg = f"Key {inp} is sent to Queue." gQueue.put(mmsg) time.sleep(0.5) # reduce power consumption # wait for all processes to terminate p0.join() p1.join() # quite gracefully print() print("Quit gracefully.")
No comments:
Post a Comment