Mpall [patched] Now
formatter = logging.Formatter( '%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # Console handler console = logging.StreamHandler(sys.stdout) console.setLevel(logging.DEBUG if verbose else logging.INFO) console.setFormatter(formatter) self.logger.addHandler(console) # File handler (if specified) if log_file: file_handler = logging.FileHandler(log_file) file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(formatter) self.logger.addHandler(file_handler)
I'll produce a complete feature called (Multi-Process All-in-One Launcher) – a command-line tool to run commands across multiple processes, with logging, retries, timeouts, and output aggregation. Feature: mpall Purpose Execute a command across multiple parallel processes (e.g., for batch processing, stress testing, or parallel data transformation) with unified output handling and error recovery. Files Structure mpall/ ├── mpall.py # Main CLI implementation ├── README.md # Documentation └── tests/ # Unit tests mpall.py (Complete Implementation) #!/usr/bin/env python3 """ mpall - Multi-Process All-in-One Launcher Run a command across multiple parallel processes with logging, retries, timeouts, and aggregated output. """ formatter = logging
mpall -c "convert.py input output" -f tasks.txt -w 4 </code></pre> <h3>With retries and timeout</h3> <pre><code class="language-bash">mpall -c "flaky_command arg" -r arg=test --retries 3 -t 30 </code></pre> <h3>Save results</h3> <pre><code class="language-bash">mpall -c "test.sh param" -r param=value -o results.json --summary summary.txt </code></pre> <h2>Options</h2> <p>| Option | Description | |--------|-------------| | <code>-c, --command</code> | Command to execute (required) | | <code>-r, --replace</code> | Replacements (key=val pairs) | | <code>-f, --replace-file</code> | File with replacements | | <code>-w, --workers</code> | Parallel workers (default: 4) | | <code>-t, --timeout</code> | Timeout per task in seconds (default: 60) | | <code>--retries</code> | Retry count on failure (default: 0) | | <code>-v, --verbose</code> | Show stdout/stderr | | <code>--log-file</code> | Save logs to file | | <code>-o, --output-json</code> | Export results to JSON | | <code>--output-summary</code> | Export summary to text | | <code>-e, --env</code> | Set environment variable |</p> <h2>Exit Codes</h2> <ul> <li><code>0</code> - All tasks succeeded</li> <li><code>1</code> - One or more tasks failed or invalid arguments</li> </ul> <h2>Use Cases</h2> <ul> <li><strong>Batch processing</strong> (convert images, compress files)</li> <li><strong>Stress testing</strong> (run load tests with different parameters)</li> <li><strong>Data validation</strong> (validate multiple datasets in parallel)</li> <li><strong>API testing</strong> (call multiple endpoints concurrently)</li> <li><strong>Deployment scripts</strong> (deploy to multiple servers)</li> </ul> <h2>License</h2> <p>MIT</p> <pre><code> --- """ mpall -c "convert
parser.add_argument( "-v", "--verbose", action="store_true", help="Verbose output (shows stdout/stderr)" ) With retries and timeout<
def run(self) -> int: """Main execution entry point.""" # Parse replacements replacements_list = self.parse_replacements() if not replacements_list: self.logger.error("No replacements provided") return 1 total_tasks = len(replacements_list) self.logger.info(f"Starting total_tasks tasks with self.args.workers workers") # Prepare environment env = os.environ.copy() if self.args.env: for env_var in self.args.env: if '=' in env_var: k, v = env_var.split('=', 1) env[k] = v # Execute tasks in parallel start_time = time.time() with ProcessPoolExecutor(max_workers=self.args.workers) as executor: futures = {} for idx, replacements in enumerate(replacements_list): if self.cancel: break future = executor.submit( worker, idx, self.args.command, replacements, self.args.timeout, self.args.retries, env ) futures[future] = idx # Collect results as they complete for future in as_completed(futures): if self.cancel: break result = future.result() self.results.append(result) self._log_result(result) total_duration = time.time() - start_time # Summary self._print_summary(total_tasks, total_duration) # Save results if requested if self.args.output_json: self._save_results_json() if self.args.output_summary: self._save_summary_text() # Return exit code (0 if all succeeded) return 0 if all(r.success for r in self.results) else 1
def _save_summary_text(self): """Save summary to text file.""" with open(self.args.output_summary, 'w') as f: f.write("MPALL EXECUTION SUMMARY\n") f.write("=" * 50 + "\n") for r in self.results: f.write(f"Task r.task_id: 'SUCCESS' if r.success else 'FAIL'\n") if not r.success: f.write(f" Error: r.stderr[:200]\n") def main(): parser = argparse.ArgumentParser( description="mpall - Run commands across multiple parallel processes", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: mpall -w 2 -c echo hello -r {} Replace placeholders mpall -c "echo name is age" -r name=alice,age=30 -r name=bob,age=25 Use replacement file mpall -c "process_file.py input output" -f replacements.txt -w 4 With retries and timeout mpall -c "curl url" -r url=http://example.com -t 30 --retries 3 Save results mpall -c "test.sh param" -r param=1 -o results.json --summary summary.txt """ )