addons-cm/website_sale_aplicoop/models/group_order.py

723 lines
26 KiB
Python

# Copyright 2025-Today Criptomart
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl)
import logging
from datetime import timedelta
from odoo import api
from odoo import fields
from odoo import models
from odoo.exceptions import ValidationError
_logger = logging.getLogger(__name__)
class GroupOrder(models.Model):
_name = "group.order"
_description = "Consumer Group Order"
_inherit = ["mail.thread", "mail.activity.mixin"]
_order = "sequence, start_date desc"
def _get_order_type_selection(self):
"""Return order type selection options with translations."""
return [
("regular", self.env._("Regular Order")),
("special", self.env._("Special Order")),
("promotional", self.env._("Promotional Order")),
]
def _get_period_selection(self):
"""Return period selection options with translations."""
return [
("once", self.env._("One-time")),
("weekly", self.env._("Weekly")),
("biweekly", self.env._("Biweekly")),
("monthly", self.env._("Monthly")),
]
def _get_day_selection(self):
"""Return day of week selection options with translations."""
return [
("0", self.env._("Monday")),
("1", self.env._("Tuesday")),
("2", self.env._("Wednesday")),
("3", self.env._("Thursday")),
("4", self.env._("Friday")),
("5", self.env._("Saturday")),
("6", self.env._("Sunday")),
]
def _get_state_selection(self):
"""Return state selection options with translations."""
return [
("draft", self.env._("Draft")),
("open", self.env._("Open")),
("closed", self.env._("Closed")),
("cancelled", self.env._("Cancelled")),
]
# === Multicompañía ===
company_id = fields.Many2one(
"res.company",
required=True,
default=lambda self: self.env.company,
tracking=True,
help="Company that owns this consumer group order",
)
# === Secuencia ===
sequence = fields.Integer(
default=10,
help="Sequence for ordering group orders in the website list",
)
# === Campos básicos ===
name = fields.Char(
required=True,
tracking=True,
translate=True,
help="Display name of this consumer group order",
)
group_ids = fields.Many2many(
"res.partner",
"group_order_group_rel",
"order_id",
"group_id",
required=True,
domain=[("is_group", "=", True)],
tracking=True,
help="Consumer groups that can participate in this order",
)
type = fields.Selection(
selection=_get_order_type_selection,
required=True,
default="regular",
tracking=True,
help="Type of consumer group order: Regular, Special (one-time), or Promotional",
)
# === Fechas ===
start_date = fields.Date(
required=False,
tracking=True,
help="Day when the consumer group order opens for purchases",
)
end_date = fields.Date(
required=False,
tracking=True,
help="If empty, the consumer group order is permanent",
)
# === Período y días ===
period = fields.Selection(
selection=_get_period_selection,
required=True,
default="weekly",
tracking=True,
help="How often this consumer group order repeats",
)
pickup_day = fields.Selection(
selection=_get_day_selection,
required=False,
tracking=True,
help="Day of the week when members pick up their orders",
)
cutoff_day = fields.Selection(
selection=_get_day_selection,
required=False,
tracking=True,
help="Day when purchases stop and the consumer group order is locked for this week.",
)
# === Home delivery ===
home_delivery = fields.Boolean(
default=False,
tracking=True,
help="Whether this consumer group order includes home delivery service",
)
delivery_product_id = fields.Many2one(
"product.product",
domain=[("type", "=", "service")],
tracking=True,
help="Product to use for home delivery (service type)",
)
delivery_date = fields.Date(
compute="_compute_delivery_date",
store=True,
readonly=True,
help="Calculated delivery date (pickup date + 1 day)",
)
# === Computed date fields ===
pickup_date = fields.Date(
compute="_compute_pickup_date",
store=True,
readonly=True,
help="Calculated next occurrence of pickup day",
)
cutoff_date = fields.Date(
compute="_compute_cutoff_date",
store=True,
readonly=True,
help="Calculated next occurrence of cutoff day",
)
# === Asociaciones ===
supplier_ids = fields.Many2many(
"res.partner",
"group_order_supplier_rel",
"order_id",
"supplier_id",
domain=[("supplier_rank", ">", 0)],
tracking=True,
help="Products from these suppliers will be available.",
)
product_ids = fields.Many2many(
"product.product",
"group_order_product_rel",
"order_id",
"product_id",
tracking=True,
help="Directly assigned products.",
)
category_ids = fields.Many2many(
"product.category",
"group_order_category_rel",
"order_id",
"category_id",
tracking=True,
help="Products in these categories will be available",
)
excluded_product_ids = fields.Many2many(
"product.product",
"group_order_excluded_product_rel",
"order_id",
"product_id",
tracking=True,
help="Products explicitly excluded from this order (blacklist has absolute priority)",
)
excluded_supplier_ids = fields.Many2many(
"res.partner",
"group_order_excluded_supplier_rel",
"order_id",
"supplier_id",
domain=[("supplier_rank", ">", 0)],
tracking=True,
help="Suppliers excluded from this order. Products with these suppliers as main seller will not be available (blacklist has absolute priority)",
)
excluded_category_ids = fields.Many2many(
"product.category",
"group_order_excluded_category_rel",
"order_id",
"category_id",
tracking=True,
help="Categories excluded from this order. Products in these categories and all their subcategories will not be available (blacklist has absolute priority)",
)
# === Estado ===
state = fields.Selection(
selection=_get_state_selection,
default="draft",
tracking=True,
)
# === Descripción e imagen ===
description = fields.Text(
translate=True,
help="Free text description for this consumer group order",
)
delivery_notice = fields.Text(
translate=True,
help="Notice about home delivery displayed to users (shown when home delivery is enabled)",
)
image = fields.Binary(
help="Image displayed alongside the consumer group order name",
attachment=True,
)
display_image = fields.Binary(
compute="_compute_display_image",
store=True,
help="Image to display: uses consumer group order image if set, otherwise group image",
attachment=True,
)
@api.depends("image", "group_ids")
def _compute_display_image(self):
"""Use order image if set, otherwise use first group image."""
for record in self:
if record.image:
record.display_image = record.image
elif record.group_ids and record.group_ids[0].image_1920:
record.display_image = record.group_ids[0].image_1920
else:
record.display_image = False
available_products_count = fields.Integer(
compute="_compute_available_products_count",
store=False,
help="Total count of available products from all sources",
)
@api.depends(
"product_ids",
"category_ids",
"supplier_ids",
"excluded_product_ids",
"excluded_supplier_ids",
"excluded_category_ids",
)
def _compute_available_products_count(self):
"""Count all available products from all sources."""
for record in self:
products = self._get_products_for_group_order(record.id)
record.available_products_count = len(products)
@api.constrains("company_id", "group_ids")
def _check_company_groups(self):
"""Validate that groups belong to the same company."""
for record in self:
for group in record.group_ids:
if group.company_id and group.company_id != record.company_id:
raise ValidationError(
self.env._(
"Group %(group)s belongs to company %(group_company)s, "
"not to %(record_company)s.",
group=group.name,
group_company=group.company_id.name,
record_company=record.company_id.name,
)
)
@api.constrains("start_date", "end_date")
def _check_dates(self):
for record in self:
if record.start_date and record.end_date:
if record.start_date > record.end_date:
raise ValidationError(
self.env._("Start date cannot be greater than end date")
)
def action_open(self):
"""Open order for purchases."""
self.write({"state": "open"})
def action_close(self):
"""Close order."""
self.write({"state": "closed"})
def action_cancel(self):
"""Cancel order."""
self.write({"state": "cancelled"})
def action_reset_to_draft(self):
"""Reset order back to draft state."""
self.write({"state": "draft"})
def get_active_orders_for_week(self):
"""Get active orders for the current week.
Respects the allowed_company_ids context if defined.
"""
today = fields.Date.today()
week_start = today - timedelta(days=today.weekday())
week_end = week_start + timedelta(days=6)
domain = [
("state", "=", "open"),
"|",
("start_date", "=", False), # No start_date = always active
("start_date", "<=", week_end),
"|",
("end_date", "=", False),
("end_date", ">=", week_start),
]
# Apply company filter if allowed_company_ids in context
if self.env.context.get("allowed_company_ids"):
domain.append(
("company_id", "in", self.env.context.get("allowed_company_ids"))
)
return self.search(domain)
@api.model
def _get_products_for_group_order(self, order_id):
"""Model helper: return product.product recordset for a given order id.
Discovery logic is owned by `group.order` so it stays close to the
order configuration. IMPORTANT: the result is the UNION of all
association sources (direct products, categories, suppliers), not a
single-branch fallback. This prevents dropping products that are
associated through multiple fields and avoids returning only one
association.
Sources included (union):
- explicit `product_ids`
- products in `category_ids` (all products whose `categ_id` matches)
- products from `supplier_ids` via `product.template.seller_ids`
Filter restrictions:
- active = True (product is not archived)
- is_published = True (product is published on website)
- sale_ok = True (product can be sold)
The returned recordset is a `product.product` set with duplicates
removed by standard recordset union semantics.
"""
order = self.browse(order_id)
if not order.exists():
return self.env["product.product"].browse()
# Common domain for all searches: active, published, and sale_ok
base_domain = [
("active", "=", True),
("product_tmpl_id.is_published", "=", True),
("product_tmpl_id.sale_ok", "=", True),
]
products = self.env["product.product"].browse()
# 1) Direct products assigned to order
if order.product_ids:
products |= order.product_ids.filtered(
lambda p: p.active
and p.product_tmpl_id.is_published
and p.product_tmpl_id.sale_ok
)
# 2) Products in categories assigned to order (including all subcategories)
if order.category_ids:
# Collect all category IDs including descendants
all_category_ids = []
def get_all_descendants(categories):
"""Recursively collect all descendant category IDs."""
for cat in categories:
all_category_ids.append(cat.id)
if cat.child_id:
get_all_descendants(cat.child_id)
get_all_descendants(order.category_ids)
# Search for products in all categories and their descendants
cat_products = self.env["product.product"].search(
[("categ_id", "in", all_category_ids)] + base_domain
)
products |= cat_products
# 3) Products from suppliers (via product.template.seller_ids)
if order.supplier_ids:
product_templates = self.env["product.template"].search(
[
("seller_ids.partner_id", "in", order.supplier_ids.ids),
("is_published", "=", True),
("sale_ok", "=", True),
]
)
supplier_products = product_templates.mapped(
"product_variant_ids"
).filtered("active")
products |= supplier_products
# 4) Apply product blacklist filter (absolute priority)
if order.excluded_product_ids:
excluded_count = len(products & order.excluded_product_ids)
products = products - order.excluded_product_ids
_logger.info(
"Group order %d: Excluded %d products from product blacklist (total: %d)",
order.id,
excluded_count,
len(products),
)
# 5) Apply supplier blacklist filter (absolute priority)
# Exclude products whose main seller is in the excluded suppliers list
if order.excluded_supplier_ids:
# Filter products where main_seller_id is in excluded_supplier_ids
excluded_by_supplier = products.filtered(
lambda p: p.product_tmpl_id.main_seller_id
and p.product_tmpl_id.main_seller_id in order.excluded_supplier_ids
)
if excluded_by_supplier:
products = products - excluded_by_supplier
_logger.info(
"Group order %d: Excluded %d products from supplier blacklist (main sellers: %s) (total: %d)",
order.id,
len(excluded_by_supplier),
", ".join(order.excluded_supplier_ids.mapped("name")),
len(products),
)
# 6) Apply category blacklist filter (absolute priority)
# Exclude products in excluded categories and all their subcategories (recursive)
if order.excluded_category_ids:
# Collect all excluded category IDs including descendants
excluded_cat_ids = []
def get_all_excluded_descendants(categories):
"""Recursively collect all excluded category IDs including children."""
for cat in categories:
excluded_cat_ids.append(cat.id)
if cat.child_id:
get_all_excluded_descendants(cat.child_id)
get_all_excluded_descendants(order.excluded_category_ids)
# Filter products whose category is in the excluded list
excluded_by_category = products.filtered(
lambda p: p.categ_id.id in excluded_cat_ids
)
if excluded_by_category:
products = products - excluded_by_category
_logger.info(
"Group order %d: Excluded %d products from category blacklist (categories: %s, including subcategories) (total: %d)",
order.id,
len(excluded_by_category),
", ".join(order.excluded_category_ids.mapped("name")),
len(products),
)
# Sort products: in-stock first, then out-of-stock, maintaining sequence+name within each group
# is_out_of_stock is Boolean: False (in stock) comes first, True (out of stock) comes last
return products.sorted(
lambda p: (
p.is_out_of_stock, # Boolean: False < True, so in-stock products first
p.product_tmpl_id.website_sequence,
(p.name or "").lower(),
)
)
def _get_products_paginated(self, order_id, page=1, per_page=20):
"""Get paginated products for a group order.
Args:
order_id: ID of the group order
page: Page number (1-indexed)
per_page: Number of products per page
Returns:
tuple: (products_page, total_count, has_next)
- products_page: recordset of product.product for this page
- total_count: total number of products in order
- has_next: boolean indicating if there are more pages
"""
all_products = self._get_products_for_group_order(order_id)
total_count = len(all_products)
# Calculate pagination
offset = (page - 1) * per_page
products_page = all_products[offset : offset + per_page]
has_next = offset + per_page < total_count
return products_page, total_count, has_next
@api.depends("cutoff_date", "pickup_day")
def _compute_pickup_date(self):
"""Compute pickup date as the first occurrence of pickup_day AFTER cutoff_date.
This ensures pickup always comes after cutoff, maintaining logical order.
"""
from datetime import datetime
_logger.info("_compute_pickup_date called for %d records", len(self))
for record in self:
if not record.pickup_day:
record.pickup_date = None
continue
target_weekday = int(record.pickup_day)
# Start from cutoff_date if available, otherwise from today/start_date
if record.cutoff_date:
reference_date = record.cutoff_date
else:
today = datetime.now().date()
if record.start_date and record.start_date < today:
reference_date = today
else:
reference_date = record.start_date or today
current_weekday = reference_date.weekday()
# Calculate days to NEXT occurrence of pickup_day from reference
days_ahead = target_weekday - current_weekday
if days_ahead <= 0:
days_ahead += 7
pickup_date = reference_date + timedelta(days=days_ahead)
record.pickup_date = pickup_date
_logger.info(
"Computed pickup_date for order %d: %s (pickup_day=%s, reference=%s)",
record.id,
record.pickup_date,
record.pickup_day,
reference_date,
)
@api.depends("cutoff_day", "start_date")
def _compute_cutoff_date(self):
"""Compute the cutoff date (deadline to place orders before pickup).
The cutoff date is the NEXT occurrence of cutoff_day from today.
This is when members can no longer place orders.
Example (as of Monday 2026-02-09):
- cutoff_day = 6 (Sunday) → cutoff_date = 2026-02-15 (next Sunday)
- pickup_day = 1 (Tuesday) → pickup_date = 2026-02-17 (Tuesday after cutoff)
"""
from datetime import datetime
_logger.info("_compute_cutoff_date called for %d records", len(self))
for record in self:
if record.cutoff_day:
target_weekday = int(record.cutoff_day)
today = datetime.now().date()
# Use today as reference if start_date is in the past, otherwise use start_date
if record.start_date and record.start_date < today:
reference_date = today
else:
reference_date = record.start_date or today
current_weekday = reference_date.weekday()
# Calculate days to NEXT occurrence of cutoff_day
days_ahead = target_weekday - current_weekday
if days_ahead < 0:
# Target day already passed this week
# Jump to next week's occurrence
days_ahead += 7
# If days_ahead == 0, cutoff is today (allowed)
record.cutoff_date = reference_date + timedelta(days=days_ahead)
_logger.info(
"Computed cutoff_date for order %d: %s (target_weekday=%d, current=%d, days=%d)",
record.id,
record.cutoff_date,
target_weekday,
current_weekday,
days_ahead,
)
else:
record.cutoff_date = None
@api.depends("pickup_date")
def _compute_delivery_date(self):
"""Compute delivery date as pickup date + 1 day."""
_logger.info("_compute_delivery_date called for %d records", len(self))
for record in self:
if record.pickup_date:
record.delivery_date = record.pickup_date + timedelta(days=1)
_logger.info(
"Computed delivery_date for order %d: %s",
record.id,
record.delivery_date,
)
else:
record.delivery_date = None
# === Constraints ===
@api.constrains("cutoff_day", "pickup_day", "period")
def _check_cutoff_before_pickup(self):
"""Validate that pickup_day comes after or equals cutoff_day in weekly orders.
For weekly orders, if pickup_day < cutoff_day numerically, it means pickup
would be scheduled BEFORE cutoff in the same week cycle, which is illogical.
Example:
- cutoff_day=3 (Thursday), pickup_day=1 (Tuesday): INVALID
(pickup Tuesday would be before cutoff Thursday)
- cutoff_day=1 (Tuesday), pickup_day=5 (Saturday): VALID
(pickup Saturday is after cutoff Tuesday)
- cutoff_day=5 (Saturday), pickup_day=5 (Saturday): VALID
(same day allowed)
"""
for record in self:
if record.cutoff_day and record.pickup_day and record.period == "weekly":
cutoff = int(record.cutoff_day)
pickup = int(record.pickup_day)
if pickup < cutoff:
pickup_name = dict(self._get_day_selection())[str(pickup)]
cutoff_name = dict(self._get_day_selection())[str(cutoff)]
raise ValidationError(
self.env._(
"For weekly orders, pickup day (%(pickup)s) must be after or equal to "
"cutoff day (%(cutoff)s) in the same week. Current configuration would "
"put pickup before cutoff, which is illogical.",
pickup=pickup_name,
cutoff=cutoff_name,
)
)
# === Onchange Methods ===
@api.onchange("cutoff_day", "start_date")
def _onchange_cutoff_day(self):
"""Force recompute cutoff_date on UI change for immediate feedback."""
self._compute_cutoff_date()
@api.onchange("pickup_day", "cutoff_day", "start_date")
def _onchange_pickup_day(self):
"""Force recompute pickup_date on UI change for immediate feedback."""
self._compute_pickup_date()
# === Cron Methods ===
@api.model
def _cron_update_dates(self):
"""Cron job to recalculate dates for active orders daily.
This ensures that computed dates stay up-to-date as time passes.
Only updates orders in 'draft' or 'open' states.
"""
orders = self.search([("state", "in", ["draft", "open"])])
_logger.info("Cron: Updating dates for %d active group orders", len(orders))
for order in orders:
order._compute_cutoff_date()
order._compute_pickup_date()
order._compute_delivery_date()
order._confirm_linked_sale_orders()
_logger.info("Cron: Date update completed")
def _confirm_linked_sale_orders(self):
"""Confirm draft/sent sale orders linked to this group order.
This is triggered by the daily cron so that weekly orders generated
from the website are confirmed automatically once dates are refreshed.
"""
self.ensure_one()
SaleOrder = self.env["sale.order"].sudo()
sale_orders = SaleOrder.search(
[
("group_order_id", "=", self.id),
("state", "in", ["draft", "sent"]),
]
)
if not sale_orders:
_logger.info(
"Cron: No sale orders to confirm for group order %s (%s)",
self.id,
self.name,
)
return
_logger.info(
"Cron: Confirming %d sale orders for group order %s (%s)",
len(sale_orders),
self.id,
self.name,
)
try:
sale_orders.action_confirm()
except Exception:
_logger.exception(
"Cron: Error confirming sale orders for group order %s (%s)",
self.id,
self.name,
)