325 lines
10 KiB
Python
325 lines
10 KiB
Python
# Copyright 2014 Ignacio Ibeas <ignacio@acysos.com>
|
|
# Copyright 2020 Santi Noreña - Criptomart <tech@criptomart.net>
|
|
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
|
|
|
|
import logging
|
|
import time
|
|
import socket
|
|
from unidecode import unidecode
|
|
|
|
from odoo import models, api, _
|
|
from odoo.exceptions import ValidationError, UserError
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
class product_balance_code(models.Model):
|
|
_inherit = "product.balance.code"
|
|
|
|
@api.onchange("key")
|
|
def _onchange_key(self):
|
|
for balance in self:
|
|
if balance.key:
|
|
if not len(balance.key) == 2:
|
|
raise ValidationError(_("The key must have 2 digits"))
|
|
if not balance.key.isdigit():
|
|
raise ValidationError(_("Key between 0-9"))
|
|
|
|
@api.onchange("table")
|
|
def _onchange_table(self):
|
|
for balance in self:
|
|
if balance.table:
|
|
if not len(balance.table) == 1:
|
|
raise ValidationError(_("The table must have 1 digit"))
|
|
if not balance.key.isdigit():
|
|
raise ValidationError(_("Table between 0-9"))
|
|
|
|
def get_checksum(self, micade):
|
|
check = 0
|
|
longitud_cade = len(micade)
|
|
i = 0
|
|
buffercheck = []
|
|
while i < longitud_cade:
|
|
buffercheck.append(micade[i : i + 1])
|
|
if buffercheck[i] == chr(126):
|
|
buffercheck[i] = chr(0)
|
|
check = check ^ ord(buffercheck[i])
|
|
i += 1
|
|
|
|
check = check ^ (longitud_cade + 2)
|
|
check = (check & 63) | 64
|
|
|
|
return check
|
|
|
|
def recv_timeout(self, the_socket, timeout=2):
|
|
# make socket non blocking
|
|
the_socket.setblocking(0)
|
|
|
|
# total data partwise in an array
|
|
total_data = []
|
|
data = ""
|
|
|
|
# beginning time
|
|
begin = time.time()
|
|
while 1:
|
|
# if you got some data, then break after timeout
|
|
if total_data and time.time() - begin > timeout:
|
|
break
|
|
|
|
# if you got no data at all, wait a little longer,
|
|
# twice the timeout
|
|
elif time.time() - begin > timeout * 2:
|
|
break
|
|
|
|
# recv something
|
|
try:
|
|
data = the_socket.recv(8192)
|
|
for ch in data:
|
|
_logger.debug(ord(ch)),
|
|
_logger.debug("\n\n")
|
|
if data:
|
|
total_data.append(data)
|
|
# change the beginning time for measurement
|
|
begin = time.time()
|
|
else:
|
|
# sleep for sometime to indicate a gap
|
|
time.sleep(0.1)
|
|
except:
|
|
pass
|
|
|
|
# join all parts to make final string
|
|
return "".join(total_data)
|
|
|
|
def add_balance_epelsa_socket(self, code, balance):
|
|
# Product
|
|
# create an INET, STREAMing socket
|
|
try:
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
|
|
except socket.error:
|
|
_logger.debug("Failed to create socket")
|
|
return False
|
|
|
|
_logger.debug("Socket Created")
|
|
|
|
host = balance.ip
|
|
port = balance.port
|
|
|
|
try:
|
|
remote_ip = socket.gethostbyname(host)
|
|
|
|
except socket.gaierror:
|
|
# could not resolve
|
|
raise UserError(_("Error ! Hostname could not be resolved. Exiting"))
|
|
|
|
# Connect to remote server
|
|
try:
|
|
s.connect((remote_ip, port))
|
|
except:
|
|
raise UserError(_("Error ! No route to host"))
|
|
|
|
_logger.debug("Socket Connected to " + host + " on ip " + remote_ip)
|
|
|
|
# Send some data to remote server
|
|
operation = "7"
|
|
suboperation = "0"
|
|
prod_code = code.product_id.balance_name
|
|
list_price = round(code.product_id.list_price, 2)
|
|
_logger.debug("[epelsa] list_price rounded: %s " % list_price)
|
|
price = "%07d" % (round(list_price * 100, 2))
|
|
_logger.debug("[epelsa] price to send rounded: %s " % price)
|
|
|
|
if code.product_id.not_weighed:
|
|
sale_type = "U"
|
|
else:
|
|
sale_type = "W"
|
|
|
|
name = unidecode(code.product_id.name.ljust(25))
|
|
tare = code.product_id.tare
|
|
# Epelsa supports 25 char only name
|
|
name = name[:25]
|
|
message = chr(2) + operation + suboperation + prod_code + chr(0)
|
|
message += (
|
|
"01" + chr(0) + code.product_id.balance_name[2:] + chr(0) + "000" + chr(0)
|
|
)
|
|
message += "0000" + chr(0) + price + chr(0) + sale_type + chr(0)
|
|
message += "000" + chr(0) + "0" + chr(0) + name + chr(0)
|
|
message += str(tare).rjust(5, "0") + chr(0) + chr(3)
|
|
|
|
_logger.debug("Message set product: " + message)
|
|
checksum = self.get_checksum(message)
|
|
_logger.debug("Checksum: chr(" + str(checksum) + ") = " + chr(checksum))
|
|
message += chr(checksum) + chr(13)
|
|
|
|
try:
|
|
# Set the whole string
|
|
s.sendall(bytes(message, "utf-8"))
|
|
except socket.error:
|
|
# Send failed
|
|
_logger.debug("Send failed")
|
|
raise UserError(_("Error! Send failed %s") % name)
|
|
|
|
_logger.debug("Message send successfully set product : " + message)
|
|
|
|
# get reply and print
|
|
# _logger.debug('Reply set product : ' + self.recv_timeout(s))
|
|
|
|
# Close the socket
|
|
s.close()
|
|
|
|
# Key
|
|
# create an INET, STREAMing socket
|
|
try:
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
|
|
except socket.error:
|
|
raise UserError(_("Error! Failed to create socket %s") % name)
|
|
|
|
_logger.debug("Socket Created")
|
|
|
|
try:
|
|
remote_ip = socket.gethostbyname(host)
|
|
except socket.gaierror:
|
|
# could not resolve
|
|
raise UserError(
|
|
_("Error! Hostname could not be resolved. Exiting %s") % name
|
|
)
|
|
|
|
# Connect to remote server
|
|
s.connect((remote_ip, port))
|
|
|
|
_logger.debug("Socket Connected to " + host + " on ip " + remote_ip)
|
|
|
|
# default values for key and table to prevent errors when not configured
|
|
key = "99"
|
|
table = "09"
|
|
# Send some data to remote server
|
|
operation = "5"
|
|
suboperation = "7"
|
|
mode = "1"
|
|
bal_code = balance.name
|
|
if code.table:
|
|
table = "0" + code.table
|
|
type_key = "1"
|
|
prod_code = code.product_id.balance_name
|
|
if code.key:
|
|
key = code.key
|
|
message = chr(2) + operation + suboperation + mode + chr(0)
|
|
message += bal_code + chr(0) + table + chr(0) + type_key + chr(0)
|
|
message += "01" + chr(0) + "0" + key + chr(0) + prod_code + chr(3)
|
|
|
|
_logger.debug("Message set key : " + message)
|
|
checksum = self.get_checksum(message)
|
|
_logger.debug("Checksum: chr(" + str(checksum) + ") = " + chr(checksum))
|
|
|
|
message += chr(checksum) + chr(13)
|
|
|
|
try:
|
|
# Set the whole string
|
|
s.sendall(bytes(message, "utf-8"))
|
|
except socket.error:
|
|
# Send failed
|
|
raise UserError(_("Error! Send failed %s") % name)
|
|
|
|
_logger.debug("Message send successfully set key : " + message)
|
|
|
|
# get reply and print
|
|
# _logger.debug('Reply set key : ' + self.recv_timeout(s))
|
|
|
|
# Close the socket
|
|
s.close()
|
|
return True
|
|
|
|
def remove_balance_epelsa_socket(self, code, balance):
|
|
name = unidecode(code.product_id.name.ljust(25))
|
|
try:
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
|
|
except socket.error:
|
|
raise UserError(_("Error! Failed to create socket %s") % name)
|
|
|
|
_logger.debug("Socket Created")
|
|
|
|
host = balance.ip
|
|
port = balance.port
|
|
|
|
try:
|
|
remote_ip = socket.gethostbyname(host)
|
|
|
|
except socket.gaierror:
|
|
# could not resolve
|
|
raise UserError(
|
|
_("Error! Hostname could not be resolved. Exiting %s") % name
|
|
)
|
|
|
|
# Connect to remote server
|
|
try:
|
|
s.connect((remote_ip, port))
|
|
except:
|
|
raise UserError(_("Error! No route to host"))
|
|
|
|
_logger.debug("Socket Connected to " + host + " on ip " + remote_ip)
|
|
|
|
# Send some data to remote server
|
|
operation = "7"
|
|
suboperation = "2"
|
|
mode = "1"
|
|
prod_code = code.product_id.balance_name
|
|
|
|
message = chr(2) + operation + suboperation + mode + chr(0)
|
|
message += prod_code + chr(3)
|
|
|
|
_logger.debug("Message remove product : " + message)
|
|
checksum = self.get_checksum(message)
|
|
_logger.debug("Checksum: chr(" + str(checksum) + ") = " + chr(checksum))
|
|
|
|
message += chr(checksum) + chr(13)
|
|
|
|
try:
|
|
# Set the whole string
|
|
s.sendall(bytes(message, "utf-8"))
|
|
except socket.error:
|
|
# Send failed
|
|
raise UserError(_("Error! Send failed {}").format(name))
|
|
|
|
_logger.debug("Message send successfully remove product : " + message)
|
|
|
|
# get reply and print
|
|
_logger.debug("Reply remove product : " + self.recv_timeout(s))
|
|
|
|
# Close the socket
|
|
s.close()
|
|
return True
|
|
|
|
def update_balance_epelsa_socket(self, code, balance):
|
|
_logger.debug("updating balance")
|
|
return self.add_balance_epelsa_socket(code, balance)
|
|
|
|
|
|
class ProductTemplate(models.Model):
|
|
_inherit = "product.template"
|
|
|
|
@api.onchange("balance_name")
|
|
def _onchange_name(self):
|
|
for product in self:
|
|
if product.balance_name and not len(product.balance_name) == 6:
|
|
raise ValidationError(_("The code must have 6 digits"))
|
|
if product.balance_name and not product.balance_name.isdigit():
|
|
raise ValidationError(_("Code between 0-9"))
|
|
|
|
@api.constrains("tare")
|
|
def _check_tare(self):
|
|
tare = self.tare
|
|
if tare > 99999:
|
|
raise ValidationError(_("Tare must be maximum 99999"))
|
|
if tare % 5 != 0:
|
|
raise ValidationError(_("Tare must be multiple of 5"))
|
|
|
|
|
|
class ProductProduct(models.Model):
|
|
_inherit = "product.product"
|
|
|
|
@api.onchange("balance_name")
|
|
def _onchange_name(self):
|
|
for product in self:
|
|
if not len(product.balance_name) == 6:
|
|
raise ValidationError(_("The code must have 6 digits"))
|
|
if not product.balance_name.isdigit():
|
|
raise ValidationError(_("Code between 0-9"))
|