Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loop stuck, runs only once on each keypress #15

Closed
bskrt opened this issue Jul 30, 2013 · 7 comments
Closed

Loop stuck, runs only once on each keypress #15

bskrt opened this issue Jul 30, 2013 · 7 comments

Comments

@bskrt
Copy link

bskrt commented Jul 30, 2013

I'm working on a project that needs to do certain things on specific keypresses.

To do this I'm using a loop to check the value of the pressed key and do something accordingly. The problem I'm having is that the while loop is interrupted whenever I'm using the getKey() function, so the rest of the loop only runs once after each keypress.

I'm probably doing something very basic horribly wrong as I've just started out with python recently. A push in the right direction would be of great help!

from evdev import InputDevice, list_devices, categorize, ecodes

dev = InputDevice('/dev/input/event0')
def getKey():
    for event in dev.read_loop():
        if event.type == ecodes.EV_KEY:
            keybrd = categorize(event)
            if keybrd.keystate == keybrd.key_down:
                c = keybrd.keycode
                return c
                c = ''

while 42:
    c = getKey()
    print 'expected to be looping'

    if c == 'KEY_Z':
        print 'dosomething'
    if c == 'KEY_Q':
        print 'dosomethingelse'
    runSomeOtherFunctionThatNeedsToBeLooped()
@gvalkov
Copy link
Owner

gvalkov commented Jul 30, 2013

Hello,

You could turn getKey() into a generator by replacing return c with yield c. For example:

from evdev import *

dev = InputDevice('/dev/input/event0')

def getKey():
    for event in dev.read_loop():
        if event.type == ecodes.EV_KEY:
            e = categorize(event)
            if e.keystate == e.key_down:
                yield e.keycode

keygenerator = getKey()
while 42:
    c = next(keygenerator)
    if c == 'KEY_Z': print('dosomething')
    if c == 'KEY_Q': print('dosomethingelse')

# or you could consume the generator directly in a for loop
#for c in getKey():
#   print(c)

Here is a nice tutorial on generators and iterators.

Btw, have a look at pyzmo, a hotkey daemon that's also usable as a library. Maybe it fits your use case.

Cheers and best of luck with your project!

@bskrt
Copy link
Author

bskrt commented Jul 30, 2013

First off thanks for the incredibly swift reply!

It's still not really doing what I'd expect it to do though.
What I basically want to accomplish is:

  1. detect keypresses and do something accordingly. That part works.
  2. still loop over other code while being able to detect key presses

right now, whenever I start the keypress detection, all code I insert after it doesn't get called.
e.g. if I add a print statement after the key detection, it only gets called whenever a key has been pressed while I would expect it to continuously print.

while 42:
    c = next(keygenerator)
    if c == 'KEY_Z': print('dosomething')
    if c == 'KEY_Q': print('dosomethingelse')
    print "printing all the time!"

This is probably a totally ridiculous issue due to a lack of python skills but I can't seem to wrap my head around it.

Thanks again,
Kevin

@gvalkov
Copy link
Owner

gvalkov commented Jul 30, 2013

I think I understand now. You could use async io or threads with a queue.

Async:

from evdev import *
from select import select

dev = InputDevice('/dev/input/event3')

while 42:
    r,w,x = select([dev.fd], [], [], 0.1)

    # if there is something to be read from dev
    if r: 
        for event in dev.read():
            print(event)

    print('running every 0.1s')

Threads:

from evdev import *
from time import sleep
from threading import Thread
from Queue import Queue

dev = InputDevice('/dev/input/event3')
events = Queue()

def worker():
    for event in dev.read_loop():
        if event.type == ecodes.EV_KEY:
            events.put(event)

t = Thread(target=worker)
t.start()

while 42:
    if not events.empty():
        event = events.get_nowait()
        print(event)

    print('looping')
    sleep(0.1)

@bskrt
Copy link
Author

bskrt commented Jul 31, 2013

That's exactly what I needed, thanks so much!
Is any of both options to be recommended over the other or are both equally valid?

I'm still wondering though, what's the reason evdev is blocking the following code in the while loop. To be more precise, what part of the module causes the code to "break". Has it anything to do with "yield"?
Still in an early learning stage and trying to understand what part of the evdev code is actually stopping a normally infinite loop.

Thanks again, you've been a huge help!

@gvalkov
Copy link
Owner

gvalkov commented Jul 31, 2013

Glad I could help. If you have a look at device.read_loop(), you'll see that it uses a select() on the file descriptor of the input device. This is what blocks the program until input events can be read from the device.

select() in a nutshell:

dev1 = InputDevice(...)
dev2 = InputDevice(...)

fd2dev = {
   dev1.fd: dev1,
   dev2.fd: dev2,
}

while True:
    # this select call will block until there are input events that can be read from dev1 or dev2
    fdlist, _, _ = select([dev1.fd, dev2.fd], [], [])  # you can add an optional timeout argument here

    # fdlist is a list of file descriptors that can be read from 
    for fd in fdlist:
        dev = fd2dev[fd]  # get the InputDevice from the fd

        # this is guaranteed to return input events
        for event in dev.read():
            print(event)

Yielding is irrelevant in this case:

def read_events_yield(dev):
    r,w,x = select([dev.fd], [], [])
    for event in dev.read():
        yield event

def read_events_list(dev):
    events = []
    r,w,x = select([dev.fd], [], [])
    for event in dev.read():
        events.append(event)
    return events

# Iterating over the result of these functions will return the same events.
# The former is more memory efficient as events are consumed as soon as they
# are read, while in the latter you first build a list of events in memory and
# then return it.

The asynchronous approach is less error prone, imho. It really depends on what
kind of program you're trying to write (async io is more suitable your program
can be structured around events to which it can react).

@bskrt
Copy link
Author

bskrt commented Jul 31, 2013

Thanks a lot Georgi, you've been a great help in my journey to conquer python! :)

@mendhak
Copy link

mendhak commented Dec 11, 2013

Thanks for this, it's a great way to put the evdev loop into a Thread and allow it to be terminated. I'm using a 'keepRunning' variable which helps when I want to stop reading.

class Touchscreen(threading.Thread):
def __init__(self):
    self.keepRunning = True
    super(Touchscreen, self).__init__()


def run(self):
    dev = evdev.InputDevice("/dev/input/event0")
    while self.keepRunning:
        r,w,x = select.select([dev.fd], [], [], 0.1)
        if r:
            for event in dev.read():
                print(event)

def die(self):
    self.keepRunning = False

Then from elsewhere I just call die.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants