addons-cm/stock_picking_batch_custom/models/stock_picking_batch.py

284 lines
11 KiB
Python

# Copyright 2026 Criptomart
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import api
from odoo import fields
from odoo import models
from odoo.exceptions import UserError
class StockPickingBatch(models.Model):
_inherit = "stock.picking.batch"
summary_line_ids = fields.One2many(
comodel_name="stock.picking.batch.summary.line",
inverse_name="batch_id",
string="Product Summary",
compute="_compute_summary_line_ids",
store=True,
readonly=True,
)
@api.depends(
"move_line_ids.move_id.product_id",
"move_line_ids.move_id.product_uom_qty",
"move_line_ids.move_id.quantity",
"move_line_ids.move_id.product_uom",
"move_line_ids.move_id.state",
)
def _compute_summary_line_ids(self):
"""Aggregate move quantities per product and keep collected flag.
- Demand: move.product_uom_qty converted to product UoM.
- Done: move.quantity converted to product UoM.
- Pending: demand - done.
- Keep is_collected value if product already present.
- Skip cancelled moves.
"""
for batch in self:
# Base: limpiar si no hay líneas
if not batch.move_line_ids:
batch.summary_line_ids = [fields.Command.clear()]
continue
previous_collected = {
line.product_id.id: line.is_collected
for line in batch.summary_line_ids
if line.product_id
}
# Use regular dict to avoid accidental creation of empty entries
aggregates = {}
# Demand per move (count once per move)
for move in batch.move_ids:
if move.state == "cancel" or not move.product_id:
continue
product = move.product_id
product_uom = product.uom_id
demand = move.product_uom._compute_quantity(
move.product_uom_qty, product_uom, rounding_method="HALF-UP"
)
if product.id not in aggregates:
aggregates[product.id] = {
"product_id": product.id,
"product_uom_id": product_uom.id,
"qty_demanded": 0.0,
"qty_done": 0.0,
}
aggregates[product.id]["qty_demanded"] += demand
# Done per move line
for line in batch.move_line_ids:
move = line.move_id
if move and move.state == "cancel":
continue
product = line.product_id
if not product:
continue
product_uom = product.uom_id
done = line.product_uom_id._compute_quantity(
line.quantity, product_uom, rounding_method="HALF-UP"
)
if product.id not in aggregates:
aggregates[product.id] = {
"product_id": product.id,
"product_uom_id": product_uom.id,
"qty_demanded": 0.0,
"qty_done": 0.0,
}
aggregates[product.id]["qty_done"] += done
commands = []
products_in_totals = set()
existing_by_product = {
line.product_id.id: line for line in batch.summary_line_ids
}
for product_id, values in aggregates.items():
# Double-check: skip if product_id is not valid
if not product_id or not values.get("product_id"):
continue
products_in_totals.add(product_id)
existing_line = existing_by_product.get(product_id)
is_collected = previous_collected.get(product_id, False)
if existing_line:
commands.append(
fields.Command.update(
existing_line.id,
{
"product_uom_id": values["product_uom_id"],
"qty_demanded": values["qty_demanded"],
"qty_done": values["qty_done"],
"is_collected": is_collected,
},
)
)
else:
# Ensure all required fields are present before create
if values["product_id"] and values["product_uom_id"]:
commands.append(
fields.Command.create(
{
"product_id": values["product_id"],
"product_uom_id": values["product_uom_id"],
"qty_demanded": values["qty_demanded"],
"qty_done": values["qty_done"],
"is_collected": is_collected,
}
)
)
obsolete_lines = [
fields.Command.unlink(line.id)
for line in batch.summary_line_ids
if line.product_id.id not in products_in_totals
]
if commands or obsolete_lines:
batch.summary_line_ids = commands + obsolete_lines
else:
batch.summary_line_ids = [fields.Command.clear()]
def _raise_collected_restriction(self, base_message, product_names):
message = self.env._(base_message)
if product_names:
message += "\n" + self.env._(
"Pending products: %(products)s", products=product_names
)
raise UserError(message)
def _get_not_collected_summary_lines(self, batch_pickings, scope):
summary_lines = self.summary_line_ids
if scope == "processed":
processed_product_ids = set(
batch_pickings.move_line_ids.filtered(
lambda line: (
line.move_id.state not in ("cancel", "done")
and line.product_id
and line.move_id.quantity > 0
)
).mapped("product_id.id")
)
summary_lines = summary_lines.filtered(
lambda line: line.product_id.id in processed_product_ids
)
return summary_lines.filtered(lambda line: not line.is_collected)
def _get_not_collected_detailed_lines(self, batch_pickings, scope):
detailed_lines = batch_pickings.move_line_ids.filtered(
lambda line: line.move_id.state not in ("cancel", "done")
and line.product_id
)
if scope == "processed":
detailed_lines = detailed_lines.filtered(
lambda line: line.move_id.quantity > 0
)
return detailed_lines.filtered(lambda line: not line.is_collected)
def _check_all_products_collected(self, pickings=None):
"""Validate collected restrictions based on tab configuration."""
for batch in self:
company = batch.company_id or self.env.company
batch_pickings = (
pickings.filtered(lambda p, batch=batch: p.batch_id == batch)
if pickings
else batch.picking_ids
)
if not batch_pickings:
continue
if company.batch_detailed_restriction_enabled:
not_collected_detailed = batch._get_not_collected_detailed_lines(
batch_pickings, company.batch_detailed_restriction_scope
)
if not_collected_detailed:
product_names = ", ".join(
sorted(
set(
not_collected_detailed.mapped("product_id.display_name")
)
)
)
batch._raise_collected_restriction(
"You must mark detailed operation lines as collected before validating the batch.",
product_names,
)
if company.batch_summary_restriction_enabled:
not_collected_summary = batch._get_not_collected_summary_lines(
batch_pickings, company.batch_summary_restriction_scope
)
if not_collected_summary:
product_names = ", ".join(
sorted(
set(not_collected_summary.mapped("product_id.display_name"))
)
)
batch._raise_collected_restriction(
"You must mark product summary lines as collected before validating the batch.",
product_names,
)
class StockPickingBatchSummaryLine(models.Model):
_name = "stock.picking.batch.summary.line"
_description = "Batch Product Summary Line"
_order = "product_categ_id, product_id"
_sql_constraints = [
(
"product_required",
"CHECK(product_id IS NOT NULL)",
"Product is required for summary lines.",
),
]
batch_id = fields.Many2one(
comodel_name="stock.picking.batch",
ondelete="cascade",
required=True,
index=True,
)
product_id = fields.Many2one(
comodel_name="product.product",
required=True,
index=True,
)
product_categ_id = fields.Many2one(
comodel_name="product.category",
string="Product Category",
related="product_id.categ_id",
store=True,
readonly=True,
)
product_uom_id = fields.Many2one(
comodel_name="uom.uom",
string="Unit of Measure",
required=True,
)
qty_demanded = fields.Float(
string="Demanded Quantity",
digits="Product Unit of Measure",
required=True,
default=0.0,
)
qty_done = fields.Float(
string="Done Quantity",
digits="Product Unit of Measure",
required=True,
default=0.0,
)
qty_pending = fields.Float(
string="Pending Quantity",
digits="Product Unit of Measure",
compute="_compute_qty_pending",
store=True,
)
is_collected = fields.Boolean(string="Collected", default=False)
@api.depends("qty_demanded", "qty_done")
def _compute_qty_pending(self):
for line in self:
line.qty_pending = line.qty_demanded - line.qty_done