11
11
use Symfony \Component \Console \Input \InputOption ;
12
12
use Symfony \Component \Console \Output \OutputInterface ;
13
13
use Magento \Framework \MessageQueue \ConsumerFactory ;
14
- use Magento \MessageQueue \ Model \ Cron \ ConsumersRunner \ PidConsumerManager ;
14
+ use Magento \Framework \ Lock \ LockManagerInterface ;
15
15
16
16
/**
17
17
* Command for starting MessageQueue consumers.
@@ -22,6 +22,7 @@ class StartConsumerCommand extends Command
22
22
const OPTION_NUMBER_OF_MESSAGES = 'max-messages ' ;
23
23
const OPTION_BATCH_SIZE = 'batch-size ' ;
24
24
const OPTION_AREACODE = 'area-code ' ;
25
+ const OPTION_SINGLE_THREAD = 'single-thread ' ;
25
26
const PID_FILE_PATH = 'pid-file-path ' ;
26
27
const COMMAND_QUEUE_CONSUMERS_START = 'queue:consumers:start ' ;
27
28
@@ -36,9 +37,9 @@ class StartConsumerCommand extends Command
36
37
private $ appState ;
37
38
38
39
/**
39
- * @var PidConsumerManager
40
+ * @var LockManagerInterface
40
41
*/
41
- private $ pidConsumerManager ;
42
+ private $ lockManager ;
42
43
43
44
/**
44
45
* StartConsumerCommand constructor.
@@ -47,54 +48,60 @@ class StartConsumerCommand extends Command
47
48
* @param \Magento\Framework\App\State $appState
48
49
* @param ConsumerFactory $consumerFactory
49
50
* @param string $name
50
- * @param PidConsumerManager $pidConsumerManager
51
+ * @param LockManagerInterface $lockManager
51
52
*/
52
53
public function __construct (
53
54
\Magento \Framework \App \State $ appState ,
54
55
ConsumerFactory $ consumerFactory ,
55
56
$ name = null ,
56
- PidConsumerManager $ pidConsumerManager = null
57
+ LockManagerInterface $ lockManager = null
57
58
) {
58
59
$ this ->appState = $ appState ;
59
60
$ this ->consumerFactory = $ consumerFactory ;
60
- $ this ->pidConsumerManager = $ pidConsumerManager ?: \Magento \Framework \App \ObjectManager::getInstance ()
61
- ->get (PidConsumerManager ::class);
61
+ $ this ->lockManager = $ lockManager ?: \Magento \Framework \App \ObjectManager::getInstance ()
62
+ ->get (LockManagerInterface ::class);
62
63
parent ::__construct ($ name );
63
64
}
64
65
65
66
/**
66
- * { @inheritdoc}
67
+ * @inheritdoc
67
68
*/
68
69
protected function execute (InputInterface $ input , OutputInterface $ output )
69
70
{
70
71
$ consumerName = $ input ->getArgument (self ::ARGUMENT_CONSUMER );
71
72
$ numberOfMessages = $ input ->getOption (self ::OPTION_NUMBER_OF_MESSAGES );
72
73
$ batchSize = (int )$ input ->getOption (self ::OPTION_BATCH_SIZE );
73
74
$ areaCode = $ input ->getOption (self ::OPTION_AREACODE );
74
- $ pidFilePath = $ input ->getOption (self ::PID_FILE_PATH );
75
75
76
- if ($ pidFilePath && $ this ->pidConsumerManager ->isRun ($ pidFilePath )) {
77
- $ output ->writeln ('<error>Consumer with the same PID is running</error> ' );
78
- return \Magento \Framework \Console \Cli::RETURN_FAILURE ;
76
+ if ($ input ->getOption (self ::PID_FILE_PATH )) {
77
+ $ input ->setOption (self ::OPTION_SINGLE_THREAD , true );
79
78
}
80
79
81
- if ($ pidFilePath ) {
82
- $ this ->pidConsumerManager ->savePid ($ pidFilePath );
80
+ $ singleThread = $ input ->getOption (self ::OPTION_SINGLE_THREAD );
81
+
82
+ if ($ singleThread && $ this ->lockManager ->isLocked (md5 ($ consumerName ))) { //phpcs:ignore
83
+ $ output ->writeln ('<error>Consumer with the same name is running</error> ' );
84
+ return \Magento \Framework \Console \Cli::RETURN_FAILURE ;
83
85
}
84
86
85
- if ($ areaCode !== null ) {
86
- $ this ->appState ->setAreaCode ($ areaCode );
87
- } else {
88
- $ this ->appState ->setAreaCode ('global ' );
87
+ if ($ singleThread ) {
88
+ $ this ->lockManager ->lock (md5 ($ consumerName )); //phpcs:ignore
89
89
}
90
90
91
+ $ this ->appState ->setAreaCode ($ areaCode ?? 'global ' );
92
+
91
93
$ consumer = $ this ->consumerFactory ->get ($ consumerName , $ batchSize );
92
94
$ consumer ->process ($ numberOfMessages );
95
+
96
+ if ($ singleThread ) {
97
+ $ this ->lockManager ->unlock (md5 ($ consumerName )); //phpcs:ignore
98
+ }
99
+
93
100
return \Magento \Framework \Console \Cli::RETURN_SUCCESS ;
94
101
}
95
102
96
103
/**
97
- * { @inheritdoc}
104
+ * @inheritdoc
98
105
*/
99
106
protected function configure ()
100
107
{
@@ -125,11 +132,17 @@ protected function configure()
125
132
'The preferred area (global, adminhtml, etc...) '
126
133
. 'default is global. '
127
134
);
135
+ $ this ->addOption (
136
+ self ::OPTION_SINGLE_THREAD ,
137
+ null ,
138
+ InputOption::VALUE_NONE ,
139
+ 'This option prevents running multiple copies of one consumer simultaneously. '
140
+ );
128
141
$ this ->addOption (
129
142
self ::PID_FILE_PATH ,
130
143
null ,
131
144
InputOption::VALUE_REQUIRED ,
132
- 'The file path for saving PID '
145
+ 'The file path for saving PID (This option is deprecated, use --single-thread instead) '
133
146
);
134
147
$ this ->setHelp (
135
148
<<<HELP
@@ -150,8 +163,12 @@ protected function configure()
150
163
To specify the preferred area:
151
164
152
165
<comment>%command.full_name% someConsumer --area-code='adminhtml'</comment>
166
+
167
+ To do not run multiple copies of one consumer simultaneously:
168
+
169
+ <comment>%command.full_name% someConsumer --single-thread'</comment>
153
170
154
- To save PID enter path:
171
+ To save PID enter path (This option is deprecated, use --single-thread instead) :
155
172
156
173
<comment>%command.full_name% someConsumer --pid-file-path='/var/someConsumer.pid'</comment>
157
174
HELP
0 commit comments