addons-cm/stock_picking_batch_custom/models/stock_picking_batch.py

228 lines
8.1 KiB
Python

# Copyright 2026 Criptomart
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import api
from odoo import fields
from odoo import models
from odoo.exceptions import UserError
class StockPickingBatch(models.Model):
_inherit = "stock.picking.batch"
summary_line_ids = fields.One2many(
comodel_name="stock.picking.batch.summary.line",
inverse_name="batch_id",
string="Product Summary",
compute="_compute_summary_line_ids",
store=True,
readonly=True,
)
@api.depends(
"move_line_ids.move_id.product_id",
"move_line_ids.move_id.product_uom_qty",
"move_line_ids.move_id.quantity",
"move_line_ids.move_id.product_uom",
"move_line_ids.move_id.state",
)
def _compute_summary_line_ids(self):
"""Aggregate move quantities per product and keep collected flag.
- Demand: move.product_uom_qty converted to product UoM.
- Done: move.quantity converted to product UoM.
- Pending: demand - done.
- Keep is_collected value if product already present.
- Skip cancelled moves.
"""
for batch in self:
# Base: limpiar si no hay líneas
if not batch.move_line_ids:
batch.summary_line_ids = [fields.Command.clear()]
continue
previous_collected = {
line.product_id.id: line.is_collected
for line in batch.summary_line_ids
if line.product_id
}
# Use regular dict to avoid accidental creation of empty entries
aggregates = {}
# Demand per move (count once per move)
for move in batch.move_ids:
if move.state == "cancel" or not move.product_id:
continue
product = move.product_id
product_uom = product.uom_id
demand = move.product_uom._compute_quantity(
move.product_uom_qty, product_uom, rounding_method="HALF-UP"
)
if product.id not in aggregates:
aggregates[product.id] = {
"product_id": product.id,
"product_uom_id": product_uom.id,
"qty_demanded": 0.0,
"qty_done": 0.0,
}
aggregates[product.id]["qty_demanded"] += demand
# Done per move line
for line in batch.move_line_ids:
move = line.move_id
if move and move.state == "cancel":
continue
product = line.product_id
if not product:
continue
product_uom = product.uom_id
done = line.product_uom_id._compute_quantity(
line.quantity, product_uom, rounding_method="HALF-UP"
)
if product.id not in aggregates:
aggregates[product.id] = {
"product_id": product.id,
"product_uom_id": product_uom.id,
"qty_demanded": 0.0,
"qty_done": 0.0,
}
aggregates[product.id]["qty_done"] += done
commands = []
products_in_totals = set()
existing_by_product = {
line.product_id.id: line for line in batch.summary_line_ids
}
for product_id, values in aggregates.items():
# Double-check: skip if product_id is not valid
if not product_id or not values.get("product_id"):
continue
products_in_totals.add(product_id)
existing_line = existing_by_product.get(product_id)
is_collected = previous_collected.get(product_id, False)
if existing_line:
commands.append(
fields.Command.update(
existing_line.id,
{
"product_uom_id": values["product_uom_id"],
"qty_demanded": values["qty_demanded"],
"qty_done": values["qty_done"],
"is_collected": is_collected,
},
)
)
else:
# Ensure all required fields are present before create
if values["product_id"] and values["product_uom_id"]:
commands.append(
fields.Command.create(
{
"product_id": values["product_id"],
"product_uom_id": values["product_uom_id"],
"qty_demanded": values["qty_demanded"],
"qty_done": values["qty_done"],
"is_collected": is_collected,
}
)
)
obsolete_lines = [
fields.Command.unlink(line.id)
for line in batch.summary_line_ids
if line.product_id.id not in products_in_totals
]
if commands or obsolete_lines:
batch.summary_line_ids = commands + obsolete_lines
else:
batch.summary_line_ids = [fields.Command.clear()]
def _check_all_products_collected(self):
"""Ensure all product summary lines are marked as collected before done."""
for batch in self:
not_collected_lines = batch.summary_line_ids.filtered(
lambda line: not line.is_collected
)
if not not_collected_lines:
continue
product_names = ", ".join(
not_collected_lines.mapped("product_id.display_name")
)
message = batch.env._(
"You must mark all product lines as collected before validating the batch."
)
if product_names:
message += "\n" + batch.env._(
"Pending products: %(products)s", products=product_names
)
raise UserError(message)
def action_done(self):
self._check_all_products_collected()
return super().action_done()
class StockPickingBatchSummaryLine(models.Model):
_name = "stock.picking.batch.summary.line"
_description = "Batch Product Summary Line"
_order = "product_categ_id, product_id"
_sql_constraints = [
(
"product_required",
"CHECK(product_id IS NOT NULL)",
"Product is required for summary lines.",
),
]
batch_id = fields.Many2one(
comodel_name="stock.picking.batch",
ondelete="cascade",
required=True,
index=True,
)
product_id = fields.Many2one(
comodel_name="product.product",
required=True,
index=True,
)
product_categ_id = fields.Many2one(
comodel_name="product.category",
string="Product Category",
related="product_id.categ_id",
store=True,
readonly=True,
)
product_uom_id = fields.Many2one(
comodel_name="uom.uom",
string="Unit of Measure",
required=True,
)
qty_demanded = fields.Float(
string="Demanded Quantity",
digits="Product Unit of Measure",
required=True,
default=0.0,
)
qty_done = fields.Float(
string="Done Quantity",
digits="Product Unit of Measure",
required=True,
default=0.0,
)
qty_pending = fields.Float(
string="Pending Quantity",
digits="Product Unit of Measure",
compute="_compute_qty_pending",
store=True,
)
is_collected = fields.Boolean(string="Collected", default=False)
@api.depends("qty_demanded", "qty_done")
def _compute_qty_pending(self):
for line in self:
line.qty_pending = line.qty_demanded - line.qty_done