Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Messages are getting overlapped when logging in the sys.stderr using multiprocessing #108

Closed
chkoar opened this issue Jul 1, 2019 · 27 comments
Labels
bug Something isn't working

Comments

@chkoar
Copy link

chkoar commented Jul 1, 2019

I am trying to log to sys.stderr using multiprocessing but the log messages are getting overlapped. Is this the intended behavior?

2019-07-01 15:19:49.197 | SUCCESS  | __mp_main__:worker:14 - My function executed successfully
2019-07-01 15:19:49.2052019-07-01 15:19:49.205 |  | SUCCESS SUCCESS  |  | __mp_main____mp_main__::workerworker::1414 -  - My function executed successfullyMy function executed successfully

2019-07-01 15:19:50.198 | SUCCESS  | __mp_main__2019-07-01 15:19:50.200: | workerSUCCESS : | 14__mp_main__ - :My function executed successfullyworker
:14 - My function executed successfully
2019-07-01 15:19:50.205 | SUCCESS  | __mp_main__:worker:14 - My function executed successfully
2019-07-01 15:19:50.209 | SUCCESS  | __mp_main__:worker:14 - My function executed successfully
2019-07-01 15:19:51.207 | SUCCESS  | __mp_main__:worker:14 - My function executed successfully
2019-07-01 15:19:52.198 | SUCCESS  | __mp_main__:worker:14 - My function executed successfully
2019-07-01 15:19:52.209 | SUCCESS  | __mp_main__:worker:14 - My function executed successfully

Here is the code that generated the above output.

import sys
import time
import random
from concurrent.futures import ProcessPoolExecutor

from loguru import logger

logger.remove()
logger.add(sys.stderr, enqueue=True)


def worker(seconds):
    time.sleep(seconds)
    logger.success("My function executed successfully")


def main():

    with ProcessPoolExecutor() as executor:
        seconds = [random.randint(1, 3) for i in range(10)]
        executor.map(worker, seconds)


if __name__ == "__main__":
    main()

Cheers,
Chris

@chkoar chkoar changed the title Messages are getting overlapped when printing in the sys.stderr using multiprocessing Messages are getting overlapped when logging in the sys.stderr using multiprocessing Jul 1, 2019
@Delgan
Copy link
Owner

Delgan commented Jul 1, 2019

Hey @chkoar, thanks for the bug report!

I can reproduce this, but only when using Windows (works fine with Linux). Are you using Windows too?

@chkoar
Copy link
Author

chkoar commented Jul 2, 2019

Are you using Windows too?

Correct. I can confirm that in Linux it is working as it was intended to be.
Do you think that we could overcome this thing in Windows?

@Delgan
Copy link
Owner

Delgan commented Jul 3, 2019

There is surely something that can be done about this, but I don't know what exactly yet. 😛

This is related to the lack of fork() on Windows. Multiprocessing on Windows is implemented by launching a new process and executing the whole Python file once again to retrieve global variables (like your worker function). This is called "spawning".

Everything that is outside of if __name__ == "__main__:" is executed as many times as processes are started. So, each new process will .add() a new handler with it's own non-shared queue (Loguru uses a multiprocessing.SimpleQueue() when you add enqueue=True), so it is useless.

Putting logger.add() under the if condition is worthless too, as this would result with child process using the default stderr handler not protected. Finally, passing the logger to your worker, besides not being very elegant, would not work neither as SimpleQueue() is not pickable.

I will need to investigate this further, notably by looking at the recipes from the standard logging library: Logging to a single file from multiple processes.

In the meantime, you can add colorize=False, because on Windows the stderr need to be flushed while adding colors to the output, and this causes multiprocess logs to be mixed up very often.

@chkoar
Copy link
Author

chkoar commented Jul 4, 2019

Well, actually adding colorize=False in add method or changing the default sink logger._handlers[0].colorize = False it seems that solves the problem.

2019-07-04 14:29:53.944 | SUCCESS  | __mp_main__:worker:14 - My function executed successfully
2019-07-04 14:29:54.938 | SUCCESS  | __mp_main__:worker:14 - My function executed successfully
2019-07-04 14:29:54.938 | SUCCESS  | __mp_main__:worker:14 - My function executed successfully
2019-07-04 14:29:54.943 | SUCCESS  | __mp_main__:worker:14 - My function executed successfully
2019-07-04 14:29:55.939 | SUCCESS  | __mp_main__:worker:14 - My function executed successfully
2019-07-04 14:29:55.941 | SUCCESS  | __mp_main__:worker:14 - My function executed successfully
2019-07-04 14:29:55.944 | SUCCESS  | __mp_main__:worker:14 - My function executed successfully
2019-07-04 14:29:56.942 | SUCCESS  | __mp_main__:worker:14 - My function executed successfully
2019-07-04 14:29:56.945 | SUCCESS  | __mp_main__:worker:14 - My function executed successfully
2019-07-04 14:29:56.945 | SUCCESS  | __mp_main__:worker:14 - My function executed successfully
2019-07-04 14:29:56.945 | SUCCESS  | __mp_main__:worker:14 - My function executed successfully
2019-07-04 14:29:57.940 | SUCCESS  | __mp_main__:worker:14 - My function executed successfully
2019-07-04 14:29:57.947 | SUCCESS  | __mp_main__:worker:14 - My function executed successfully
2019-07-04 14:29:58.940 | SUCCESS  | __mp_main__:worker:14 - My function executed successfully
2019-07-04 14:29:58.943 | SUCCESS  | __mp_main__:worker:14 - My function executed successfully
2019-07-04 14:29:58.946 | SUCCESS  | __mp_main__:worker:14 - My function executed successfully
2019-07-04 14:29:59.945 | SUCCESS  | __mp_main__:worker:14 - My function executed successfully
2019-07-04 14:29:59.948 | SUCCESS  | __mp_main__:worker:14 - My function executed successfully
2019-07-04 14:29:59.948 | SUCCESS  | __mp_main__:worker:14 - My function executed successfully
2019-07-04 14:30:00.940 | SUCCESS  | __mp_main__:worker:14 - My function executed successfully

In the meantime, you can add colorize=False, because on Windows the stderr need to be flushed while adding colors to the output, and this causes multiprocess logs to be mixed up very often.

I think it should be stated in the docs that if you use multiprocessing with colorize=False (which apparently is the default) you will encounter the problem that I described in this issue.

@Delgan
Copy link
Owner

Delgan commented Jul 4, 2019

Instead of updating the documentation, I think I should find the proper implementation to prevent logs to be mixed up, no matter what is the value of colorize. This is the purpose of enqueue, it's supposed to avoid such weirdness!

Also, I think setting colorize=False is a quick-fix which drastically reduces the risks, but it actually does not prevent this bug to happen inherently.

@Delgan
Copy link
Owner

Delgan commented Jul 6, 2019

Ok, I think I better understand what is going on. So here are a few notes to myself.

  • multiprocessing on Windows use "spawn" due to the lack of "fork" (this behavior can be set on Linux too using multiprocessing.set_start_method())
  • When "spawning" child process, the whole Python script needs to be re-executed (except for the code under "__main__")
  • If logger.add() is called outside of "__main__" then every child processes will have a different handler, so synchronization is not possible
  • Similarly, calling logger.remove() outside of "__main__" implies that child processes will not deliver messages to any sink, and not calling logger.add() or logger.remove() at all means that they will use the default sys.stderr (without synchronization)
  • Objects passed to args=... while starting Process() need to be serializable using pickle (except for multiprocessing.Queue() for unknown reason)
  • Class attributes are like global variable, they are "initialized" during class definition
  • After passing the logger to args during child process init, it will need to be deserialized, so loguru will be imported. If from loguru import logger is set outside of __main__, the module is already loaded, so it will inherit the existing handlers as discussed previously. If from loguru import logger is set inside of __main__, the module is entirely re-imported, the class attributes are hence reseted to the default values.
  • A possible workaround would be to remove all class attributes in logger so handlers are properly inherited, but that still requires logger to be passed through args. However, this will probablly raise others problems: if _handlers is an instance attribute (rather than class attribute), it will need to be pickled too (currently, pickle.loads(pickle.dumps(logger)) will reload _handlers as a "global" reference I guess), but sinks are not necesarilly pickable (particularly if they are closure functions).

Many headaches in perspective. :)

@Delgan Delgan added the enhancement Improvement to an already existing feature label Jul 9, 2019
@pylipp
Copy link
Contributor

pylipp commented Aug 5, 2019

@Delgan Should the use of loguru in a multiprocessing-context on Windows be discouraged then? I ran into a similar issue, see the following snippet:

import time
import multiprocessing

from loguru import logger

LOGGER_CONFIG = {
    "handlers": [
        dict(
            sink="test.log",
            serialize=True,
            enqueue=True,
            level=20,
        ),
    ],
}

class BusyLogProcess(multiprocessing.Process):
    """A process that logs very fast."""

    def run(self):
        print(f"Busy {logger}")
        print(id(logger))
        while True:
            logger.info("a")
            time.sleep(0.01)


if __name__ == "__main__":
    #multiprocessing.set_start_method("spawn")

    # Set up logger
    print(logger._handlers)
    logger.configure(**LOGGER_CONFIG)

    logger.info("testing in main")

    busy_log_process = BusyLogProcess()
    busy_log_process.start()

    # Let BusyLogProcess create some log entries
    time.sleep(0.5)

    busy_log_process.terminate()

    print(logger._handlers)

When running this under Linux, all log entries go into test.log, and nothing is printed to the terminal (as expected).
When running under Windows (or under Linux with multiprocessing.set_start_method('spawn')), only the message from the main process is logged into the file. The messages from the BusyLogProcess are printed to terminal.

@Delgan
Copy link
Owner

Delgan commented Aug 5, 2019

Hi @pylipp.

Thanks for the code snippet, it is another useful illustration of the problem. I really did not expect multiprocessing on Windows to be that complicated.

You are right, unfortunately it seems that Loguru's logger is totally unreliable in such situation for now. 😕

I'm thinking to a possible workaround, but it's still not precisely defined in my mind.

@contang0
Copy link

contang0 commented Sep 13, 2019

I'm not getting any log messages from the child processes (created with multiprocessing.Pool), even with enqueue set to True. :/

This is my config:

logger.remove(0)
logger.add(sys.stderr, level='DEBUG', enqueue=True)
logger.add(f"file_{now}.log", level='INFO', enqueue=True)

And yes, I'm also on Windows..

@Delgan
Copy link
Owner

Delgan commented Sep 13, 2019

Hey @Joolius, would this be possible to please share a complete code snippet, with multiprocessing and your workers using logger? It would help to have a fully reproducible example to work with.

@rclapp
Copy link

rclapp commented Sep 27, 2019

@Delgan I see a similar issue when using Multiprocessing. Messages via sys.stderr overlap. Plus any messages sent by my child libraries are not captured My child libraries are using traditional logging, since I thought I could work around the problem, no luck. I tired to relocate the handler generation inside of main but no luck. I also tried my own queue handler too. Oddly before I moved to loguru, it worked okay in the normal logging module with the addition of a custom queue handler (below).

Here are the relevant sections of code:

def main():

    arguments = docopt(__doc__, version='1.0.0')
    logfile = arguments["--logfile"]

    #Logging Set-Up Loguru
    logger.remove()

    #Intercept Logging Messages
    logging.basicConfig(handlers=[InterceptHandler()], level=0)

    logger.add(MultiprocessingLogHandler('c:\dev\mystreamhandler.log'), level='DEBUG')

    if arguments["debug"]:
        logger.add(sys.stdout, level='DEBUG', filter='__main__')
    else:
        logger.add(sys.stdout, level='INFO', filter='__main__')

    if logfile is not None:
        try:
            if arguments["debug"]:
                logger.add(logfile, level="DEBUG", enqueue=True, retention="10 days", backtrace=True, diagnose=True)
            else:
                logger.add(logfile, level="INFO", enqueue=True, retention="10 days")
        except Exception as e:
            logger.exception('Failed to add file handlers')

###DO OTHER THINGS
   for index, row in records.iterrows():
      results = pool.apply_async(func=calculations.Run_Allocation_Passes,
                        args=(tm1, max_passes, row['Scenario'], row['Cluster'], row['Time Monthly'],
                        row['Service'], output_file_path,),
                       callback=update)

 logger.debug('this will work')

Logging inside the main module works, but sub_modules called via the apply_async, seem to go no where. As I mentioned the sub modules are using logging like this:

import logging
logger = logging.getLogger(__name__)

Custom handler that worked with traditional logging

class MultiprocessingLogHandler(logging.Handler):
    """multiprocessing log handler

    This handler makes it possible for several processes
    to log to the same file by using a queue.

    """
    def __init__(self, fname):
        logging.Handler.__init__(self)

        self._handler = FH(fname)
        self.queue = multiprocessing.Queue(-1)

        thrd = threading.Thread(target=self.receive)
        thrd.daemon = True
        thrd.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        while True:
            try:
                record = self.queue.get()
                self._handler.emit(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

    def send(self, s):
        self.queue.put_nowait(s)

    def _format_record(self, record):
        if record.args:
            record.msg = record.msg % record.args
            record.args = None
        if record.exc_info:
            dummy = self.format(record)
            record.exc_info = None

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        self._handler.close()
        logging.Handler.close(self)

@Delgan
Copy link
Owner

Delgan commented Sep 28, 2019

Hey @rclapp, thanks for the detailed explanations!

The problem is that while using multiprocessing on Windows, the whole Python file have to be re-executed. This disturbs loguru: either you called add() inside of __main__ and the second process is not initialized with the proper handler, either you called it outside of __main__ and the handler is added twice with two different Queue objects.

This is why, on Windows, the logger has to be passed somehow to the child processes. However, this currently does not work, because the logger is implemented using class attributes (which are not passed to the child process). I planed to refactor the logger, it should work better in the next release.

Also, concerning the interoperability with standard logging: your MultiprocessingLogHandler needs to be shared with the child processes. For the reasons I explained, child processes does not automatically inherit of handlers added in the main process.
I think you should try to make use of the initializer argument of multiprocessing.Pool() as described in this StackOverflow answer (make sure your MultiProcessingLogHandler() instances use the same queue object among all child processes).

@Delgan Delgan added bug Something isn't working and removed enhancement Improvement to an already existing feature labels Oct 20, 2019
@Delgan
Copy link
Owner

Delgan commented Oct 20, 2019

Ok, I finally managed to fix that. 😛

Dealing with multiprocessing on Windows is really painful, especially considering that Loguru heavily relies on one unique global logger.

I had to rethink the management of handlers (see #157), it's not perfect but the final solution is satisfactory enough in my opinion.

Basically, you will need to make your child processes "inherit" from the parent logger. It should not be passed as an argument once the child is started, it should be passed during construction of the child. To do so, most multiprocessing functions offer initializer and initargs arguments. This also requires added handlers to be picklable or, alternatively, to be added with enqueue=True.

So, the initial snippet of this issue may look like this once updated:

import sys
import time
import random
from concurrent.futures import ProcessPoolExecutor

from loguru import logger

def set_logger(logger_):
    global logger
    logger = logger_

def worker(seconds):
    time.sleep(seconds)
    logger.success("My function executed successfully")


def main():

    with ProcessPoolExecutor(initializer=set_logger, initargs=(logger, )) as executor:
        seconds = [random.randint(1, 3) for i in range(10)]
        executor.map(worker, seconds)


if __name__ == "__main__":
    logger.remove()
    logger.add(sys.stderr, enqueue=True)

    main()

I added some code snippets and explanation in the documentation, I highly recommend to read it if you need to deal with multiprocessing on Windows:

I will leave this issue open until next loguru release. In the meantine, feel free to suggest improvements to the documentation or ask any question.

@chkoar
Copy link
Author

chkoar commented Oct 20, 2019

@Delgan that is great! When do you plan to make a new release?

@Delgan
Copy link
Owner

Delgan commented Oct 20, 2019

@chkoar My "goal" was to publish v0.4.0 before the end of the month. However, I also planed to finalize fix for this issue last week-end. Considering how bad I am at estimating development time, you better take this with a grain of salt. 🙂

@contang0
Copy link

wow, amazing, looking forward! sorry for not having posted my code. will test this out once released.

@danmou
Copy link

danmou commented Oct 26, 2019

BTW this problem also happens on linux when using the 'forkserver' spawn method.

@contang0
Copy link

contang0 commented Nov 5, 2019

@rclapp
Copy link

rclapp commented Nov 20, 2019

I refactored some of my code to leverage the class and static method "method". When I have the sys.stdout as one of my sinks prior to initilizeing my pool I get a pickle error.

If I initialize my pool then add sys.stdout as a sink it works okay

``
Traceback (most recent call last):
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 115, in _main
self = reduction.pickle.load(from_parent)
EOFError: Ran out of input
2019-11-19 18:20:47.052 | ERROR | main:main:323 - cannot serialize '_io.TextIOWrapper' object
Traceback (most recent call last):

File "C:\Program Files\JetBrains\PyCharm 2019.2.2\helpers\pydev\pydevd.py", line 2108, in
main()
└ <function main at 0x000002647E2C6A68>

File "C:\Program Files\JetBrains\PyCharm 2019.2.2\helpers\pydev\pydevd.py", line 2099, in main
globals = debugger.run(setup['file'], None, None, is_module)
│ │ │ └ False
│ │ └ {'port': 56359, 'vm_type': None, 'client': '127.0.0.1', 'server': False, 'DEBUG_RECORD_SOCKET_READS': False, 'multiproc': Tru...
│ └ <function PyDB.run at 0x000002647E2C4A68>
└ <main.PyDB object at 0x00000264735B5A48>

File "C:\Program Files\JetBrains\PyCharm 2019.2.2\helpers\pydev\pydevd.py", line 1408, in run
return self._exec(is_module, entry_point_fn, module_name, file, globals, locals)
│ │ │ │ │ │ │ └ {'name': 'main', 'doc': ' Program to run the N-TIer Allocations.\n\nUsage:\n Management_Allocations.py [run] (...
│ │ │ │ │ │ └ {'name': 'main', 'doc': ' Program to run the N-TIer Allocations.\n\nUsage:\n Management_Allocations.py [run] (...
│ │ │ │ │ └ 'C:/Dev/AWS_Management_Allocations/src/Management_Allocations.py'
│ │ │ │ └ None
│ │ │ └ ''
│ │ └ False
│ └ <function PyDB._exec at 0x000002647E2C4AF8>
└ <main.PyDB object at 0x00000264735B5A48>

File "C:\Program Files\JetBrains\PyCharm 2019.2.2\helpers\pydev\pydevd.py", line 1415, in _exec
pydev_imports.execfile(file, globals, locals) # execute the script
│ │ │ │ └ {'name': 'main', 'doc': ' Program to run the N-TIer Allocations.\n\nUsage:\n Management_Allocations.py [run] (...
│ │ │ └ {'name': 'main', 'doc': ' Program to run the N-TIer Allocations.\n\nUsage:\n Management_Allocations.py [run] (...
│ │ └ 'C:/Dev/AWS_Management_Allocations/src/Management_Allocations.py'
│ └ <function execfile at 0x000002647C6A2F78>
└ <module '_pydev_bundle.pydev_imports' from 'C:\Program Files\JetBrains\PyCharm 2019.2.2\helpers\pydev\_pydev_bundle\py...

File "C:\Program Files\JetBrains\PyCharm 2019.2.2\helpers\pydev_pydev_imps_pydev_execfile.py", line 18, in execfile
exec(compile(contents+"\n", file, 'exec'), glob, loc)
│ │ │ └ {'name': 'main', 'doc': ' Program to run the N-TIer Allocations.\n\nUsage:\n Management_Allocations.py [run] (...
│ │ └ {'name': 'main', 'doc': ' Program to run the N-TIer Allocations.\n\nUsage:\n Management_Allocations.py [run] (...
│ └ 'C:/Dev/AWS_Management_Allocations/src/Management_Allocations.py'
└ '""" Program to run the N-TIer Allocations.\n\nUsage:\n Management_Allocations.py [run] (-i INSTANCE) (-c CLUSTER) (-s SCE...

File "C:/Dev/AWS_Management_Allocations/src\Management_Allocations.py", line 331, in
main()
└ <function main at 0x0000026400CDF318>

File "C:/Dev/AWS_Management_Allocations/src\Management_Allocations.py", line 321, in main
run_allocations(tm1)
│ └ <TM1py.Services.TM1Service.TM1Service object at 0x0000026400859208>
└ <function run_allocations at 0x0000026400CDFB88>

File "C:/Dev/AWS_Management_Allocations/src\Management_Allocations.py", line 291, in run_allocations
pool.close()
└ None

File "C:/Dev/AWS_Management_Allocations/src\Management_Allocations.py", line 226, in run_allocations
pool = init_processing_pool(worker)
│ └ <aws_management_allocations.allocation_worker.AllocationWorker object at 0x00000264012E0D48>
└ <function init_processing_pool at 0x0000026400CDC678>

File "C:/Dev/AWS_Management_Allocations/src\Management_Allocations.py", line 145, in init_processing_pool
raise e

File "C:/Dev/AWS_Management_Allocations/src\Management_Allocations.py", line 142, in init_processing_pool
pool = mp.Pool(cores, initializer=worker.set_logger, initargs=(logger, ))
│ │ │ │ │ └ <loguru.logger handlers=[(id=1, level=10, sink=), (id=2, level=20, sink=C:\dev_new\logtest.log)]>
│ │ │ │ └ <staticmethod object at 0x0000026400C66108>
│ │ │ └ <aws_management_allocations.allocation_worker.AllocationWorker object at 0x00000264012E0D48>
│ │ └ 4
│ └ <bound method BaseContext.Pool of <multiprocessing.context.DefaultContext object at 0x000002647E122A88>>
└ <module 'multiprocessing' from 'C:\Program Files\Python37\lib\multiprocessing\init.py'>

File "C:\Program Files\Python37\lib\multiprocessing\context.py", line 119, in Pool
context=self.get_context())
│ └ <function DefaultContext.get_context at 0x000002647E414438>
└ <multiprocessing.context.DefaultContext object at 0x000002647E122A88>

File "C:\Program Files\Python37\lib\multiprocessing\pool.py", line 176, in init
self._repopulate_pool()
│ └ <function Pool._repopulate_pool at 0x0000026400E89168>
└ <multiprocessing.pool.Pool object at 0x0000026400859388>

File "C:\Program Files\Python37\lib\multiprocessing\pool.py", line 241, in _repopulate_pool
w.start()
│ └ <function BaseProcess.start at 0x000002647E40A318>
└ <SpawnProcess(SpawnPoolWorker-1, initial daemon)>

File "C:\Program Files\Python37\lib\multiprocessing\process.py", line 112, in start
self._popen = self._Popen(self)
│ │ │ │ └ <SpawnProcess(SpawnPoolWorker-1, initial daemon)>
│ │ │ └ <staticmethod object at 0x000002647E3FAF88>
│ │ └ <SpawnProcess(SpawnPoolWorker-1, initial daemon)>
│ └ None
└ <SpawnProcess(SpawnPoolWorker-1, initial daemon)>

File "C:\Program Files\Python37\lib\multiprocessing\context.py", line 322, in _Popen
return Popen(process_obj)
│ └ <SpawnProcess(SpawnPoolWorker-1, initial daemon)>
└ <class 'multiprocessing.popen_spawn_win32.Popen'>

File "C:\Program Files\Python37\lib\multiprocessing\popen_spawn_win32.py", line 89, in init
reduction.dump(process_obj, to_child)
│ │ │ └ <_io.BufferedWriter name=5>
│ │ └ <SpawnProcess(SpawnPoolWorker-1, initial daemon)>
│ └ <function dump at 0x000002647E405CA8>
└ <module 'multiprocessing.reduction' from 'C:\Program Files\Python37\lib\multiprocessing\reduction.py'>

File "C:\Program Files\Python37\lib\multiprocessing\reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
│ │ │ └ <SpawnProcess(SpawnPoolWorker-1, initial daemon)>
│ │ └ None
│ └ <_io.BufferedWriter name=5>
└ <class 'multiprocessing.reduction.ForkingPickler'>

TypeError: cannot serialize '_io.TextIOWrapper' object

@Delgan
Copy link
Owner

Delgan commented Nov 20, 2019

@Joolius That's an interesting new module. However, I'm not sure it's suitable for Loguru. It's useful to share data between processes, but it requires to be serialized to a bytes, and it doesn't do much else. Actually, I think the multiprocessing issues are mostly solve with the new implementation and the queue argument. It just remains me to publish the v.0.4.0. 🙂

@rclapp Indeed, this is the "expected" behavior with the new version. While initializing sub-processes on Windows, Python requires the init arguments to be picklable. However, stdout is not picklable, so you need to use enqueue=True while adding it. If you add the handler after starting your sub-process, it might work, but it implies the stdout handlers are not synchronized among your processes, which may result in overlapping logs.

@rclapp
Copy link

rclapp commented Nov 20, 2019

@Delgan . Thanks, I forgot to add that parameter, that resolves the error. However I now have a new set of errors. Any light you can shed on this would be helpful.

-- Logging error in Loguru Handler #2 ---
Record was: {'elapsed': datetime.timedelta(seconds=24, microseconds=406505), 'exception': None, 'extra': {}, 'file': (name='allocation_worker.py', path='C:\\Dev\\AWS_Management_Allocations\\src\\aws_management_allocations\\allocation_worker.py'), 'function': 'zero_out_allocations', 'level': (name='DEBUG', no=10, icon='🐞'), 'line': 160, 'message': 'Zero Out Process Complete: Actual, IAD, 201801, AWS', 'module': 'allocation_worker', 'name': 'aws_management_allocations.allocation_worker', 'process': (id=10380, name='SpawnPoolWorker-1'), 'thread': (id=9636, name='MainThread'), 'time': datetime(2019, 11, 20, 14, 52, 47, 948239, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=57600), 'Pacific Standard Time'))}
Traceback (most recent call last):
  File "C:\Program Files\Python37\lib\site-packages\loguru-0.3.2-py3.7.egg\loguru\_handler.py", line 250, in _queued_writer
    message = queue.get()
  File "C:\Program Files\Python37\lib\multiprocessing\queues.py", line 354, in get
    return _ForkingPickler.loads(res)
TypeError: __init__() missing 2 required positional arguments: 'status_code' and 'reason'
--- End of logging error ---
Traceback (most recent call last):
  File "C:\Program Files\Python37\lib\threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Program Files\Python37\lib\multiprocessing\pool.py", line 470, in _handle_results
    task = get()
  File "C:\Program Files\Python37\lib\multiprocessing\connection.py", line 251, in recv
    return _ForkingPickler.loads(buf.getbuffer())
TypeError: __init__() missing 2 required positional arguments: 'status_code' and 'reason'
Exception in thread Thread-18:
Traceback (most recent call last):
  File "C:\Program Files\Python37\lib\threading.py", line 926, in _bootstrap_inner
    self.run()
  File "C:\Program Files\Python37\lib\threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Program Files\Python37\lib\multiprocessing\pool.py", line 470, in _handle_results
    task = get()
  File "C:\Program Files\Python37\lib\multiprocessing\connection.py", line 251, in recv
    return _ForkingPickler.loads(buf.getbuffer())
TypeError: __init__() missing 2 required positional arguments: 'status_code' and 'reason'

@Delgan
Copy link
Owner

Delgan commented Nov 21, 2019

@rclapp Hum... Would this be possible to see the handlers you are using please?

@rclapp
Copy link

rclapp commented Nov 21, 2019 via email

@Delgan
Copy link
Owner

Delgan commented Nov 22, 2019

@rclapp Thanks. It does not seem to be related to your handlers. It's hard to tell without seeing your whole code, but I bet you are somehow attaching an non-picklable instance to the record dict, maybe using logger.bind()?

When enqueue=True, the record needs to be serialized, passed to a Queue, and then de-serialized. According to the traceback, an error occurs while calling queue.get(), so the de-serialization process is in fault. I don't know why, as I don't know which object is being de-serialized (it looks like a requests response used by some AWS lib).

Actually, I have been misled by the logging message Record was: ...: as the message = queue.get() failed, the record actually displayed is the previous one which was actually processed without error.

I think you need to figure out where is such "non picklable" object bound to the logger. 😕

@Delgan
Copy link
Owner

Delgan commented Dec 1, 2019

I can finally close this issue! v0.4.0 has just been released and should solve this problem. 👍

As a reminder:

I added some code snippets and explanation in the documentation, I highly recommend to read it if you need to deal with multiprocessing on Windows:

@wytxty
Copy link

wytxty commented Aug 17, 2023

Ok, I finally managed to fix that. 😛

Dealing with multiprocessing on Windows is really painful, especially considering that Loguru heavily relies on one unique global logger.

I had to rethink the management of handlers (see #157), it's not perfect but the final solution is satisfactory enough in my opinion.

Basically, you will need to make your child processes "inherit" from the parent logger. It should not be passed as an argument once the child is started, it should be passed during construction of the child. To do so, most multiprocessing functions offer initializer and initargs arguments. This also requires added handlers to be picklable or, alternatively, to be added with enqueue=True.

So, the initial snippet of this issue may look like this once updated:

import sys
import time
import random
from concurrent.futures import ProcessPoolExecutor

from loguru import logger

def set_logger(logger_):
    global logger
    logger = logger_

def worker(seconds):
    time.sleep(seconds)
    logger.success("My function executed successfully")


def main():

    with ProcessPoolExecutor(initializer=set_logger, initargs=(logger, )) as executor:
        seconds = [random.randint(1, 3) for i in range(10)]
        executor.map(worker, seconds)


if __name__ == "__main__":
    logger.remove()
    logger.add(sys.stderr, enqueue=True)

    main()

I added some code snippets and explanation in the documentation, I highly recommend to read it if you need to deal with multiprocessing on Windows:

I will leave this issue open until next loguru release. In the meantine, feel free to suggest improvements to the documentation or ask any question.

Got error: cannot pickle '_io.TextIOWrapper' object......Any idea?

@Delgan
Copy link
Owner

Delgan commented Aug 17, 2023

@wytxty The logger you're passing to the ProcessPoolExecutor likely contains sink which weren't configured with enqueue=True.

If you need further assistance, please open a new issue and detail your configuration.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

7 participants