Monday, 12 May 2025

Non-blocking Console Single Keypress Input

This post presents a class that can capture single keypress in the terminal console without blocking the background processes.

The script starts with a Fibonacci subroutine running in background and a keyboard listener running in foreground. The listener is an instance derived from the class NonBlockingConsole which functions as suggested by its name.

The script can be stopped by either one of the way:

  1. Press q to stop manually.
  2. Wait until 50 Fibonacci numbers are found.

Meanwhile, you can press f to check the progress of the background process while the script is running.


Output


# python3 pc-1.py 
  Menu
--------------------------------
    q - quit
    f - show FibNum progress

Process FibNum(P1) is running...
P1: F(10)...
P1: F(12) = 144
P1: F(20)...
P1: F(30)...
Key a is sent to Queue.
Key s is sent to Queue.
P1: F(36) = 14930352
P1: F(40)...
Key n is sent to Queue.
Key k is sent to Queue.
P1: F(50)...
P1: Stop all processes...

Quit gracefully.

Source Code


#!/usr/bin/python3

from multiprocessing import Process, Queue, Value, Lock
import ctypes
import sys, select, tty, termios
import time
 
# Non-blocking console single keypress input
class NonBlockingConsole(object):
    def __enter__(self):
        self.old_settings = termios.tcgetattr(sys.stdin)
        tty.setcbreak(sys.stdin.fileno())
        return self

    def __exit__(self, type, value, traceback):
        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.old_settings)

    def get_data(self):
        if select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []):
            '''
            # version for detecting some
            # special keys
            ch = sys.stdin.read(1)
            if ord(ch)==27:
                ch1 = sys.stdin.read(1)
                ch2 = sys.stdin.read(1)
                if ch1=='[' and ch2 == 'A':
                    ch = 'up'
                elif ch1=='[' and ch2 == 'B':
                    ch = 'down'
                elif ch1=='[' and ch2 == 'C':
                    ch = 'right'
                elif ch1=='[' and ch2 == 'D':
                    ch = 'left'
            elif ord(ch)==127:
                ch = 'backspace'
            return ch
            '''
            # return raw code only without
            # identifying special keys
            return sys.stdin.read(1)
        return False


# Processes
# ---------
def FibNum(pRun, pOut, pLock, pQueue):
    mN1 = 1
    mN2 = 1
    mIdx = 3
    print('Process FibNum(P1) is running...\r')
    while pRun.value==True:
        mN = mN1 + mN2
        if mIdx%10==0:
            print(f'P1: F({mIdx})...\r')
        if pOut.value==True:
            mmsg = f'P1: F({mIdx}) = {mN}\r'
            pQueue.put(mmsg)
            with pLock:
                pOut.value = False
        mN1 = mN2
        mN2 = mN
        mIdx += 1
        time.sleep(0.5)
        if mIdx > 50:
            with pLock:
                pRun.value = False
                print('P1: Stop all processes...\r')

def OutputMsg(pRun, pQueue):
    while pRun.value==True:
        if pQueue.empty():
            continue
        mmsg = pQueue.get()
        print(mmsg)


# main program
# -------------------------------------------------
# a message queue for communication between
# process
gQueue = Queue()
# the following are shared variables used in the
# main program and the processes.
gRun = Value(ctypes.c_bool,True) # continue to run
gP1Out = Value(ctypes.c_bool,False) # print out in process 1
# a lock is required whenever a process wants to update
# a shared value
gLock = Lock()

# show menu
print('  Menu')
print('--------------------------------')
print('    q - quit')
print('    f - show FibNum progress')
print()

# define processes
p0 = Process(target=OutputMsg,args=(gRun, gQueue))
p1 = Process(target=FibNum,args=(gRun, gP1Out, gLock, gQueue))

# start all processes
p0.start()
p1.start()

# activate non blocking keypress
# listener
with NonBlockingConsole() as nonblock:
    while gRun.value:
        inp = nonblock.get_data()
        if inp:
            if (inp=="q"):
                # stop all processes
                with gLock:
                    gRun.value = False
                print("Manual Quitting the program...")
            elif (inp=="f"):
                with gLock:
                    gP1Out.value = True
            else:
                mmsg = f"Key {inp} is sent to Queue."
                gQueue.put(mmsg)
        time.sleep(0.5) # reduce power consumption

# wait for all processes to terminate
p0.join()
p1.join()

# quite gracefully
print()
print("Quit gracefully.")

No comments:

Post a Comment