addons-cm/website_sale_aplicoop/models/group_order.py
snt 8b0a402ccf [FIX] website_sale_aplicoop: Critical date calculation fixes (v18.0.1.3.1)
- Fixed _compute_cutoff_date logic: Changed days_ahead <= 0 to days_ahead < 0 to allow cutoff_date same day as today
- Enabled store=True for delivery_date field to persist calculated values and enable database filtering
- Added constraint _check_cutoff_before_pickup to validate pickup_day >= cutoff_day in weekly orders
- Added @api.onchange methods for immediate UI feedback when changing cutoff_day or pickup_day
- Created daily cron job _cron_update_dates to automatically recalculate dates for active orders
- Added 'Calculated Dates' section in form view showing readonly cutoff_date, pickup_date, delivery_date
- Added 6 regression tests with @tagged('post_install', 'date_calculations')
- Updated documentation with comprehensive changelog

This is a more robust fix than v18.0.1.2.0, addressing edge cases in date calculations.
2026-02-18 17:45:45 +01:00

598 lines
21 KiB
Python

# Copyright 2025-Today Criptomart
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl)
import logging
from datetime import timedelta
from odoo import _
from odoo import api
from odoo import fields
from odoo import models
from odoo.exceptions import ValidationError
_logger = logging.getLogger(__name__)
class GroupOrder(models.Model):
_name = "group.order"
_description = "Consumer Group Order"
_inherit = ["mail.thread", "mail.activity.mixin"]
_order = "start_date desc"
@staticmethod
def _get_order_type_selection(records):
"""Return order type selection options with translations."""
return [
("regular", _("Regular Order")),
("special", _("Special Order")),
("promotional", _("Promotional Order")),
]
@staticmethod
def _get_period_selection(records):
"""Return period selection options with translations."""
return [
("once", _("One-time")),
("weekly", _("Weekly")),
("biweekly", _("Biweekly")),
("monthly", _("Monthly")),
]
@staticmethod
def _get_day_selection(records):
"""Return day of week selection options with translations."""
return [
("0", _("Monday")),
("1", _("Tuesday")),
("2", _("Wednesday")),
("3", _("Thursday")),
("4", _("Friday")),
("5", _("Saturday")),
("6", _("Sunday")),
]
@staticmethod
def _get_state_selection(records):
"""Return state selection options with translations."""
return [
("draft", _("Draft")),
("open", _("Open")),
("closed", _("Closed")),
("cancelled", _("Cancelled")),
]
# === Multicompañía ===
company_id = fields.Many2one(
"res.company",
string="Company",
required=True,
default=lambda self: self.env.company,
tracking=True,
help="Company that owns this consumer group order",
)
# === Campos básicos ===
name = fields.Char(
string="Name",
required=True,
tracking=True,
translate=True,
help="Display name of this consumer group order",
)
group_ids = fields.Many2many(
"res.partner",
"group_order_group_rel",
"order_id",
"group_id",
string="Consumer Groups",
required=True,
domain=[("is_group", "=", True)],
tracking=True,
help="Consumer groups that can participate in this order",
)
type = fields.Selection(
selection=_get_order_type_selection,
string="Order Type",
required=True,
default="regular",
tracking=True,
help="Type of consumer group order: Regular, Special (one-time), or Promotional",
)
# === Fechas ===
start_date = fields.Date(
string="Start Date",
required=False,
tracking=True,
help="Day when the consumer group order opens for purchases",
)
end_date = fields.Date(
string="End Date",
required=False,
tracking=True,
help="If empty, the consumer group order is permanent",
)
# === Período y días ===
period = fields.Selection(
selection=_get_period_selection,
string="Recurrence Period",
required=True,
default="weekly",
tracking=True,
help="How often this consumer group order repeats",
)
pickup_day = fields.Selection(
selection=_get_day_selection,
string="Pickup Day",
required=False,
tracking=True,
help="Day of the week when members pick up their orders",
)
cutoff_day = fields.Selection(
selection=_get_day_selection,
string="Cutoff Day",
required=False,
tracking=True,
help="Day when purchases stop and the consumer group order is locked for this week.",
)
# === Home delivery ===
home_delivery = fields.Boolean(
string="Home Delivery",
default=False,
tracking=True,
help="Whether this consumer group order includes home delivery service",
)
delivery_product_id = fields.Many2one(
"product.product",
string="Delivery Product",
domain=[("type", "=", "service")],
tracking=True,
help="Product to use for home delivery (service type)",
)
delivery_date = fields.Date(
string="Delivery Date",
compute="_compute_delivery_date",
store=True,
readonly=True,
help="Calculated delivery date (pickup date + 1 day)",
)
# === Computed date fields ===
pickup_date = fields.Date(
string="Pickup Date",
compute="_compute_pickup_date",
store=True,
readonly=True,
help="Calculated next occurrence of pickup day",
)
cutoff_date = fields.Date(
string="Cutoff Date",
compute="_compute_cutoff_date",
store=True,
readonly=True,
help="Calculated next occurrence of cutoff day",
)
# === Asociaciones ===
supplier_ids = fields.Many2many(
"res.partner",
"group_order_supplier_rel",
"order_id",
"supplier_id",
string="Suppliers",
domain=[("supplier_rank", ">", 0)],
tracking=True,
help="Products from these suppliers will be available.",
)
product_ids = fields.Many2many(
"product.product",
"group_order_product_rel",
"order_id",
"product_id",
string="Products",
tracking=True,
help="Directly assigned products.",
)
category_ids = fields.Many2many(
"product.category",
"group_order_category_rel",
"order_id",
"category_id",
string="Categories",
tracking=True,
help="Products in these categories will be available",
)
# === Estado ===
state = fields.Selection(
selection=_get_state_selection,
string="State",
default="draft",
tracking=True,
)
# === Descripción e imagen ===
description = fields.Text(
string="Description",
translate=True,
help="Free text description for this consumer group order",
)
delivery_notice = fields.Text(
string="Delivery Notice",
translate=True,
help="Notice about home delivery displayed to users (shown when home delivery is enabled)",
)
image = fields.Binary(
string="Image",
help="Image displayed alongside the consumer group order name",
attachment=True,
)
display_image = fields.Binary(
string="Display Image",
compute="_compute_display_image",
store=True,
help="Image to display: uses consumer group order image if set, otherwise group image",
attachment=True,
)
@api.depends("image", "group_ids")
def _compute_display_image(self):
"""Use order image if set, otherwise use first group image."""
for record in self:
if record.image:
record.display_image = record.image
elif record.group_ids and record.group_ids[0].image_1920:
record.display_image = record.group_ids[0].image_1920
else:
record.display_image = False
available_products_count = fields.Integer(
string="Available Products Count",
compute="_compute_available_products_count",
store=False,
help="Total count of available products from all sources",
)
@api.depends("product_ids", "category_ids", "supplier_ids")
def _compute_available_products_count(self):
"""Count all available products from all sources."""
for record in self:
products = self._get_products_for_group_order(record.id)
record.available_products_count = len(products)
@api.constrains("company_id", "group_ids")
def _check_company_groups(self):
"""Validate that groups belong to the same company."""
for record in self:
for group in record.group_ids:
if group.company_id and group.company_id != record.company_id:
raise ValidationError(
f"Group {group.name} belongs to company "
f"{group.company_id.name}, not to {record.company_id.name}."
)
@api.constrains("start_date", "end_date")
def _check_dates(self):
for record in self:
if record.start_date and record.end_date:
if record.start_date > record.end_date:
raise ValidationError("Start date cannot be greater than end date")
def action_open(self):
"""Open order for purchases."""
self.write({"state": "open"})
def action_close(self):
"""Close order."""
self.write({"state": "closed"})
def action_cancel(self):
"""Cancel order."""
self.write({"state": "cancelled"})
def action_reset_to_draft(self):
"""Reset order back to draft state."""
self.write({"state": "draft"})
def get_active_orders_for_week(self):
"""Get active orders for the current week.
Respects the allowed_company_ids context if defined.
"""
today = fields.Date.today()
week_start = today - timedelta(days=today.weekday())
week_end = week_start + timedelta(days=6)
domain = [
("state", "=", "open"),
"|",
("start_date", "=", False), # No start_date = always active
("start_date", "<=", week_end),
"|",
("end_date", "=", False),
("end_date", ">=", week_start),
]
# Apply company filter if allowed_company_ids in context
if self.env.context.get("allowed_company_ids"):
domain.append(
("company_id", "in", self.env.context.get("allowed_company_ids"))
)
return self.search(domain)
@api.model
def _get_products_for_group_order(self, order_id):
"""Model helper: return product.product recordset for a given order id.
Discovery logic is owned by `group.order` so it stays close to the
order configuration. IMPORTANT: the result is the UNION of all
association sources (direct products, categories, suppliers), not a
single-branch fallback. This prevents dropping products that are
associated through multiple fields and avoids returning only one
association.
Sources included (union):
- explicit `product_ids`
- products in `category_ids` (all products whose `categ_id` matches)
- products from `supplier_ids` via `product.template.seller_ids`
Filter restrictions:
- active = True (product is not archived)
- is_published = True (product is published on website)
- sale_ok = True (product can be sold)
The returned recordset is a `product.product` set with duplicates
removed by standard recordset union semantics.
"""
order = self.browse(order_id)
if not order.exists():
return self.env["product.product"].browse()
# Common domain for all searches: active, published, and sale_ok
base_domain = [
("active", "=", True),
("product_tmpl_id.is_published", "=", True),
("product_tmpl_id.sale_ok", "=", True),
]
products = self.env["product.product"].browse()
# 1) Direct products assigned to order
if order.product_ids:
products |= order.product_ids.filtered(
lambda p: p.active
and p.product_tmpl_id.is_published
and p.product_tmpl_id.sale_ok
)
# 2) Products in categories assigned to order (including all subcategories)
if order.category_ids:
# Collect all category IDs including descendants
all_category_ids = []
def get_all_descendants(categories):
"""Recursively collect all descendant category IDs."""
for cat in categories:
all_category_ids.append(cat.id)
if cat.child_id:
get_all_descendants(cat.child_id)
get_all_descendants(order.category_ids)
# Search for products in all categories and their descendants
cat_products = self.env["product.product"].search(
[("categ_id", "in", all_category_ids)] + base_domain
)
products |= cat_products
# 3) Products from suppliers (via product.template.seller_ids)
if order.supplier_ids:
product_templates = self.env["product.template"].search(
[
("seller_ids.partner_id", "in", order.supplier_ids.ids),
("is_published", "=", True),
("sale_ok", "=", True),
]
)
supplier_products = product_templates.mapped(
"product_variant_ids"
).filtered("active")
products |= supplier_products
return products
def _get_products_paginated(self, order_id, page=1, per_page=20):
"""Get paginated products for a group order.
Args:
order_id: ID of the group order
page: Page number (1-indexed)
per_page: Number of products per page
Returns:
tuple: (products_page, total_count, has_next)
- products_page: recordset of product.product for this page
- total_count: total number of products in order
- has_next: boolean indicating if there are more pages
"""
all_products = self._get_products_for_group_order(order_id)
total_count = len(all_products)
# Calculate pagination
offset = (page - 1) * per_page
products_page = all_products[offset : offset + per_page]
has_next = offset + per_page < total_count
return products_page, total_count, has_next
@api.depends("cutoff_date", "pickup_day")
def _compute_pickup_date(self):
"""Compute pickup date as the first occurrence of pickup_day AFTER cutoff_date.
This ensures pickup always comes after cutoff, maintaining logical order.
"""
from datetime import datetime
_logger.info("_compute_pickup_date called for %d records", len(self))
for record in self:
if not record.pickup_day:
record.pickup_date = None
continue
target_weekday = int(record.pickup_day)
# Start from cutoff_date if available, otherwise from today/start_date
if record.cutoff_date:
reference_date = record.cutoff_date
else:
today = datetime.now().date()
if record.start_date and record.start_date < today:
reference_date = today
else:
reference_date = record.start_date or today
current_weekday = reference_date.weekday()
# Calculate days to NEXT occurrence of pickup_day from reference
days_ahead = target_weekday - current_weekday
if days_ahead <= 0:
days_ahead += 7
pickup_date = reference_date + timedelta(days=days_ahead)
record.pickup_date = pickup_date
_logger.info(
"Computed pickup_date for order %d: %s (pickup_day=%s, reference=%s)",
record.id,
record.pickup_date,
record.pickup_day,
reference_date,
)
@api.depends("cutoff_day", "start_date")
def _compute_cutoff_date(self):
"""Compute the cutoff date (deadline to place orders before pickup).
The cutoff date is the NEXT occurrence of cutoff_day from today.
This is when members can no longer place orders.
Example (as of Monday 2026-02-09):
- cutoff_day = 6 (Sunday) → cutoff_date = 2026-02-15 (next Sunday)
- pickup_day = 1 (Tuesday) → pickup_date = 2026-02-17 (Tuesday after cutoff)
"""
from datetime import datetime
_logger.info("_compute_cutoff_date called for %d records", len(self))
for record in self:
if record.cutoff_day:
target_weekday = int(record.cutoff_day)
today = datetime.now().date()
# Use today as reference if start_date is in the past, otherwise use start_date
if record.start_date and record.start_date < today:
reference_date = today
else:
reference_date = record.start_date or today
current_weekday = reference_date.weekday()
# Calculate days to NEXT occurrence of cutoff_day
days_ahead = target_weekday - current_weekday
if days_ahead < 0:
# Target day already passed this week
# Jump to next week's occurrence
days_ahead += 7
# If days_ahead == 0, cutoff is today (allowed)
record.cutoff_date = reference_date + timedelta(days=days_ahead)
_logger.info(
"Computed cutoff_date for order %d: %s (target_weekday=%d, current=%d, days=%d)",
record.id,
record.cutoff_date,
target_weekday,
current_weekday,
days_ahead,
)
else:
record.cutoff_date = None
@api.depends("pickup_date")
def _compute_delivery_date(self):
"""Compute delivery date as pickup date + 1 day."""
_logger.info("_compute_delivery_date called for %d records", len(self))
for record in self:
if record.pickup_date:
record.delivery_date = record.pickup_date + timedelta(days=1)
_logger.info(
"Computed delivery_date for order %d: %s",
record.id,
record.delivery_date,
)
else:
record.delivery_date = None
# === Constraints ===
@api.constrains("cutoff_day", "pickup_day", "period")
def _check_cutoff_before_pickup(self):
"""Validate that pickup_day comes after or equals cutoff_day in weekly orders.
For weekly orders, if pickup_day < cutoff_day numerically, it means pickup
would be scheduled BEFORE cutoff in the same week cycle, which is illogical.
Example:
- cutoff_day=3 (Thursday), pickup_day=1 (Tuesday): INVALID
(pickup Tuesday would be before cutoff Thursday)
- cutoff_day=1 (Tuesday), pickup_day=5 (Saturday): VALID
(pickup Saturday is after cutoff Tuesday)
- cutoff_day=5 (Saturday), pickup_day=5 (Saturday): VALID
(same day allowed)
"""
for record in self:
if record.cutoff_day and record.pickup_day and record.period == "weekly":
cutoff = int(record.cutoff_day)
pickup = int(record.pickup_day)
if pickup < cutoff:
pickup_name = dict(self._get_day_selection())[str(pickup)]
cutoff_name = dict(self._get_day_selection())[str(cutoff)]
raise ValidationError(
_(
"For weekly orders, pickup day ({pickup}) must be after or equal to "
"cutoff day ({cutoff}) in the same week. Current configuration would "
"put pickup before cutoff, which is illogical."
).format(pickup=pickup_name, cutoff=cutoff_name)
)
# === Onchange Methods ===
@api.onchange("cutoff_day", "start_date")
def _onchange_cutoff_day(self):
"""Force recompute cutoff_date on UI change for immediate feedback."""
self._compute_cutoff_date()
@api.onchange("pickup_day", "cutoff_day", "start_date")
def _onchange_pickup_day(self):
"""Force recompute pickup_date on UI change for immediate feedback."""
self._compute_pickup_date()
# === Cron Methods ===
@api.model
def _cron_update_dates(self):
"""Cron job to recalculate dates for active orders daily.
This ensures that computed dates stay up-to-date as time passes.
Only updates orders in 'draft' or 'open' states.
"""
orders = self.search([("state", "in", ["draft", "open"])])
_logger.info("Cron: Updating dates for %d active group orders", len(orders))
for order in orders:
order._compute_cutoff_date()
order._compute_pickup_date()
order._compute_delivery_date()
_logger.info("Cron: Date update completed")