addons-cm/stock_picking_batch_custom/models/stock_picking_batch.py
snt ce393b6034 [FIX] TestConfirmEskaera_Integration: limpieza de decoradores @patch y corrección de bugs
- Convertir 4 tests de decorador @patch a context manager 'with patch(...)' para evitar RuntimeError en LocalProxy de Werkzeug
- Corregir patrón env(user=..., context=dict(...)) en Odoo 18 (sin .with_context())
- Agregar website real al mock para integración con helpers de pricing (_get_pricing_info)
- Añadir pickup_date en fixture de existing_order para que _find_recent_draft_order localice correctamente
- BUGFIX: Agregar (5,) a order_line para limpiar líneas previas al actualizar pedido existente

Resultado: 0 failed, 0 errors de 4 tests en Docker para TestConfirmEskaera_Integration

BREAKING: _create_or_update_sale_order ahora limpia las líneas anteriores con (5,) antes de asignar las nuevas cuando se actualiza un pedido existente. Comportamiento previo (duplicación de líneas) era un bug.
2026-04-08 17:26:57 +02:00

233 lines
8.3 KiB
Python

# Copyright 2026 Criptomart
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import api
from odoo import fields
from odoo import models
from odoo.exceptions import UserError
class StockPickingBatch(models.Model):
_inherit = "stock.picking.batch"
summary_line_ids = fields.One2many(
comodel_name="stock.picking.batch.summary.line",
inverse_name="batch_id",
string="Product Summary",
compute="_compute_summary_line_ids",
store=True,
readonly=True,
)
@api.depends(
"move_line_ids.move_id.product_id",
"move_line_ids.move_id.product_uom_qty",
"move_line_ids.move_id.quantity",
"move_line_ids.move_id.product_uom",
"move_line_ids.move_id.state",
)
def _compute_summary_line_ids(self):
"""Aggregate move quantities per product and keep collected flag.
- Demand: move.product_uom_qty converted to product UoM.
- Done: move.quantity converted to product UoM.
- Pending: demand - done.
- Keep is_collected value if product already present.
- Skip cancelled moves.
"""
for batch in self:
# Base: limpiar si no hay líneas
if not batch.move_line_ids:
batch.summary_line_ids = [fields.Command.clear()]
continue
previous_collected = {
line.product_id.id: line.is_collected
for line in batch.summary_line_ids
if line.product_id
}
# Use regular dict to avoid accidental creation of empty entries
aggregates = {}
# Demand per move (count once per move)
for move in batch.move_ids:
if move.state == "cancel" or not move.product_id:
continue
product = move.product_id
product_uom = product.uom_id
demand = move.product_uom._compute_quantity(
move.product_uom_qty, product_uom, rounding_method="HALF-UP"
)
if product.id not in aggregates:
aggregates[product.id] = {
"product_id": product.id,
"product_uom_id": product_uom.id,
"qty_demanded": 0.0,
"qty_done": 0.0,
}
aggregates[product.id]["qty_demanded"] += demand
# Done per move line
for line in batch.move_line_ids:
move = line.move_id
if move and move.state == "cancel":
continue
product = line.product_id
if not product:
continue
product_uom = product.uom_id
done = line.product_uom_id._compute_quantity(
line.quantity, product_uom, rounding_method="HALF-UP"
)
if product.id not in aggregates:
aggregates[product.id] = {
"product_id": product.id,
"product_uom_id": product_uom.id,
"qty_demanded": 0.0,
"qty_done": 0.0,
}
aggregates[product.id]["qty_done"] += done
commands = []
products_in_totals = set()
existing_by_product = {
line.product_id.id: line for line in batch.summary_line_ids
}
for product_id, values in aggregates.items():
# Double-check: skip if product_id is not valid
if not product_id or not values.get("product_id"):
continue
products_in_totals.add(product_id)
existing_line = existing_by_product.get(product_id)
is_collected = previous_collected.get(product_id, False)
if existing_line:
commands.append(
fields.Command.update(
existing_line.id,
{
"product_uom_id": values["product_uom_id"],
"qty_demanded": values["qty_demanded"],
"qty_done": values["qty_done"],
"is_collected": is_collected,
},
)
)
else:
# Ensure all required fields are present before create
if values["product_id"] and values["product_uom_id"]:
commands.append(
fields.Command.create(
{
"product_id": values["product_id"],
"product_uom_id": values["product_uom_id"],
"qty_demanded": values["qty_demanded"],
"qty_done": values["qty_done"],
"is_collected": is_collected,
}
)
)
obsolete_lines = [
fields.Command.unlink(line.id)
for line in batch.summary_line_ids
if line.product_id.id not in products_in_totals
]
if commands or obsolete_lines:
batch.summary_line_ids = commands + obsolete_lines
else:
batch.summary_line_ids = [fields.Command.clear()]
def _check_all_products_collected(self, product_ids=None):
"""Ensure collected is checked for processed products only.
The product summary remains informative until Odoo knows which products
are actually being validated after the backorder decision.
"""
for batch in self:
not_collected_lines = batch.summary_line_ids.filtered(
lambda line: (
line.qty_done > 0
and not line.is_collected
and (not product_ids or line.product_id.id in product_ids)
)
)
if not not_collected_lines:
continue
product_names = ", ".join(
not_collected_lines.mapped("product_id.display_name")
)
message = batch.env._(
"You must mark all product lines as collected before validating the batch."
)
if product_names:
message += "\n" + batch.env._(
"Pending products: %(products)s", products=product_names
)
raise UserError(message)
class StockPickingBatchSummaryLine(models.Model):
_name = "stock.picking.batch.summary.line"
_description = "Batch Product Summary Line"
_order = "product_categ_id, product_id"
_sql_constraints = [
(
"product_required",
"CHECK(product_id IS NOT NULL)",
"Product is required for summary lines.",
),
]
batch_id = fields.Many2one(
comodel_name="stock.picking.batch",
ondelete="cascade",
required=True,
index=True,
)
product_id = fields.Many2one(
comodel_name="product.product",
required=True,
index=True,
)
product_categ_id = fields.Many2one(
comodel_name="product.category",
string="Product Category",
related="product_id.categ_id",
store=True,
readonly=True,
)
product_uom_id = fields.Many2one(
comodel_name="uom.uom",
string="Unit of Measure",
required=True,
)
qty_demanded = fields.Float(
string="Demanded Quantity",
digits="Product Unit of Measure",
required=True,
default=0.0,
)
qty_done = fields.Float(
string="Done Quantity",
digits="Product Unit of Measure",
required=True,
default=0.0,
)
qty_pending = fields.Float(
string="Pending Quantity",
digits="Product Unit of Measure",
compute="_compute_qty_pending",
store=True,
)
is_collected = fields.Boolean(string="Collected", default=False)
@api.depends("qty_demanded", "qty_done")
def _compute_qty_pending(self):
for line in self:
line.qty_pending = line.qty_demanded - line.qty_done